Union Operation in PySpark DataFrames: A Comprehensive Guide
PySpark’s DataFrame API is a powerful tool for big data processing, and the union operation is a key method for combining multiple DataFrames by stacking their rows vertically. Whether you’re merging datasets from different sources, appending new records, or consolidating data for analysis, union provides a straightforward way to unify DataFrames with matching schemas. Built on Spark’s Spark SQL engine and optimized by Catalyst, it ensures scalability and efficiency across distributed systems. This guide covers what union does, the various ways to apply it, and its practical uses, with clear examples to illustrate each approach.
Ready to master union? Explore PySpark Fundamentals and let’s get started!
What is the Union Operation in PySpark?
The union method in PySpark DataFrames combines two or more DataFrames by stacking their rows vertically, returning a new DataFrame with all rows from the input DataFrames. It’s a transformation operation, meaning it’s lazy; Spark plans the union but waits for an action like show to execute it. Requiring DataFrames to have identical schemas (same column names and types), union preserves duplicates and is widely used for appending datasets, merging similar data, or building comprehensive collections for processing. It’s equivalent to unionAll in PySpark, offering identical functionality with a SQL-like naming convention.
Here’s a basic example:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("UnionIntro").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Cathy", 22), ("David", 28)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
union_df = df1.union(df2)
union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# |Cathy| 22|
# |David| 28|
# +-----+---+
spark.stop()
A SparkSession initializes the environment, and two DataFrames (df1 and df2) are created with matching schemas (name, age). The union(df2) call stacks df2’s rows below df1’s, and show() displays the combined result with all four rows. For more on DataFrames, see DataFrames in PySpark. For setup details, visit Installing PySpark.
Various Ways to Use Union in PySpark
The union operation offers multiple ways to combine DataFrames, each tailored to specific needs. Below are the key approaches with detailed explanations and examples.
1. Basic Union of Two DataFrames
The simplest use of union combines two DataFrames with identical schemas by stacking their rows vertically, preserving all data including duplicates. This is ideal when you need to append one dataset to another, such as adding new records to an existing table, without altering the original content.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BasicUnion").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Cathy", 22), ("David", 28)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
basic_union_df = df1.union(df2)
basic_union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# |Cathy| 22|
# |David| 28|
# +-----+---+
spark.stop()
The DataFrame df1 has two rows, and df2 has two rows with the same schema. The union(df2) call stacks df2 below df1, resulting in 4 rows. The show() output displays all rows in the order they were combined. This method is straightforward for basic row concatenation.
2. Union of Multiple DataFrames
The union operation can combine more than two DataFrames by chaining multiple union calls, stacking all rows vertically into a single DataFrame. This is useful when you need to consolidate data from several sources, such as monthly datasets or logs from different systems, into one unified collection.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MultiUnion").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", 30)]
data3 = [("Cathy", 22)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
df3 = spark.createDataFrame(data3, ["name", "age"])
multi_union_df = df1.union(df2).union(df3)
multi_union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# |Cathy| 22|
# +-----+---+
spark.stop()
Three DataFrames (df1, df2, df3) each have one row with the same schema. The chained union(df2).union(df3) call stacks all rows, resulting in 3 rows. The show() output shows the combined data. This method scales to append multiple DataFrames efficiently.
3. Union with Duplicates Preserved
The union operation preserves duplicates when combining DataFrames, including identical rows within or across the datasets. This is valuable when you need to retain all occurrences, such as logging events or tracking repeated entries, without deduplication.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DuplicateUnion").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Bob", 30), ("Cathy", 22)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
dup_union_df = df1.union(df2)
dup_union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# | Bob| 30|
# |Cathy| 22|
# +-----+---+
spark.stop()
The DataFrame df1 has "Bob, 30," and df2 repeats it. The union(df2) call stacks all rows, preserving the duplicate "Bob, 30" entries. The show() output shows 4 rows, including both instances. This method ensures all data, even duplicates, is retained.
4. Union with Column Renaming for Schema Alignment
The union operation requires matching schemas, and withColumnRenamed can align column names before combining DataFrames with differing but compatible structures. This is essential when merging datasets with similar data but mismatched column names, allowing seamless stacking.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RenameUnion").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Cathy", 22), ("David", 28)]
df1 = spark.createDataFrame(data1, ["full_name", "age"])
df2 = spark.createDataFrame(data2, ["name", "years"])
renamed_df2 = df2.withColumnRenamed("name", "full_name").withColumnRenamed("years", "age")
rename_union_df = df1.union(renamed_df2)
rename_union_df.show()
# Output:
# +---------+---+
# |full_name|age|
# +---------+---+
# | Alice| 25|
# | Bob| 30|
# | Cathy| 22|
# | David| 28|
# +---------+---+
spark.stop()
The DataFrame df1 uses "full_name" and "age," while df2 uses "name" and "years." The withColumnRenamed calls align df2’s schema to df1’s, and union(renamed_df2) stacks the rows. The show() output shows a unified DataFrame. This method resolves schema mismatches.
5. Union with Filtering or Transformation
The union operation can be combined with filter or other transformations post-union to refine the combined DataFrame, such as removing unwanted rows or adjusting data. This is useful when you need to process the unified dataset further, like excluding nulls or standardizing values, after stacking.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilteredUnion").getOrCreate()
data1 = [("Alice", 25), ("Bob", None)]
data2 = [("Cathy", 22), ("David", 28)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
filtered_union_df = df1.union(df2).filter(col("age").isNotNull())
filtered_union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# |Cathy| 22|
# |David| 28|
# +-----+---+
spark.stop()
The union(df2) call combines df1 and df2, including a null age for Bob. The filter(col("age").isNotNull()) removes rows with null ages, and show() displays the filtered result. This method refines the unioned data efficiently.
Common Use Cases of the Union Operation
The union operation serves various practical purposes in data integration.
1. Combining Data from Multiple Sources
The union operation merges datasets from different sources, such as logs or files.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MultiSourceUnion").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Cathy", 22), ("David", 28)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
source_union_df = df1.union(df2)
source_union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# |Cathy| 22|
# |David| 28|
# +-----+---+
spark.stop()
Data from two sources is combined.
2. Appending New Records
The union operation adds new records to an existing dataset, such as updates.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AppendRecords").getOrCreate()
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Cathy", 22)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
append_df = df1.union(df2)
append_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# |Cathy| 22|
# +-----+---+
spark.stop()
New record "Cathy" is appended.
3. Consolidating Partitioned Data
The union operation consolidates data split across partitions or files.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ConsolidatePartitions").getOrCreate()
data1 = [("Alice", "HR", 25)]
data2 = [("Bob", "IT", 30)]
df1 = spark.createDataFrame(data1, ["name", "dept", "age"]).repartition(1)
df2 = spark.createDataFrame(data2, ["name", "dept", "age"]).repartition(1)
consolidated_df = df1.union(df2)
consolidated_df.show()
# Output:
# +-----+----+---+
# | name|dept|age|
# +-----+----+---+
# |Alice| HR| 25|
# | Bob| IT| 30|
# +-----+----+---+
spark.stop()
Partitioned data is unified.
4. Building Comprehensive Datasets
The union operation builds datasets by combining similar records.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BuildDataset").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", 30)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
build_df = df1.union(df2)
build_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# +-----+---+
spark.stop()
A comprehensive dataset is built.
FAQ: Answers to Common Union Questions
Below are answers to frequently asked questions about the union operation in PySpark.
Q: How does union differ from unionAll?
A: They are identical in PySpark; unionAll is an alias.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FAQVsUnionAll").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", 30)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
union_df = df1.union(df2)
union_all_df = df1.unionAll(df2)
union_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# +-----+---+
union_all_df.show() # Same output
spark.stop()
Both produce the same result.
Q: Does union remove duplicates?
A: No, union preserves all rows, including duplicates.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FAQDuplicates").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Alice", 25)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
dup_df = df1.union(df2)
dup_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# |Alice| 25|
# +-----+---+
spark.stop()
Duplicates are retained.
Q: How does union handle null values?
A: Nulls are preserved in the combined DataFrame.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FAQNulls").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", None)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
null_df = df1.union(df2)
null_df.show()
# Output:
# +-----+----+
# | name| age|
# +-----+----+
# |Alice| 25|
# | Bob| null|
# +-----+----+
spark.stop()
"Bob"’s null age is included.
Q: Does union affect performance?
A: It’s efficient for small unions; large datasets increase memory use.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FAQPerformance").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", 30)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])
perf_df = df1.union(df2)
perf_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# +-----+---+
spark.stop()
Small unions are lightweight.
Q: Can I union DataFrames with different schemas?
A: No, schemas must match; use renaming or casting first.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FAQSchema").getOrCreate()
data1 = [("Alice", 25)]
data2 = [("Bob", "30")]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "years"])
# This would fail: df1.union(df2)
df2_aligned = df2.withColumnRenamed("years", "age")
schema_df = df1.union(df2_aligned)
schema_df.show()
# Output:
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# | Bob| 30|
# +-----+---+
spark.stop()
Renaming aligns schemas for union.
Union vs Other DataFrame Operations
The union operation stacks DataFrames vertically, unlike join (merges horizontally), groupBy (aggregates groups), or filter (row conditions). It differs from withColumn (adds/modifies columns) by combining datasets and leverages Spark’s optimizations over RDD operations.
More details at DataFrame Operations.
Conclusion
The union operation in PySpark is a versatile way to combine DataFrame data vertically. Master it with PySpark Fundamentals to enhance your data integration skills!