How to Save a PySpark DataFrame to a Parquet File: The Ultimate Guide
Published on April 17, 2025
Diving Straight into Saving a PySpark DataFrame to a Parquet File
Saving a PySpark DataFrame to a Parquet file is a powerful technique for data engineers using Apache Spark, enabling efficient storage, compression, and querying of large datasets. Parquet, a columnar storage format, is optimized for big data processing, offering benefits like reduced storage size and faster read performance compared to formats like CSV. This comprehensive guide explores the syntax and steps for saving a DataFrame to a Parquet file, with targeted examples covering basic Parquet writing, customizing options, handling nested data, and using SQL-based approaches. Each section addresses a specific aspect of Parquet export, supported by practical code, error handling, and performance optimization strategies to build robust ETL pipelines. The primary method, write.parquet(), is explained with all its parameters. Let’s save those DataFrames! For more on PySpark, see PySpark Fundamentals.
Saving a DataFrame to a Parquet File with Default Settings
The primary method for saving a PySpark DataFrame to a Parquet file is the write.parquet() method, which exports the DataFrame to a directory containing Parquet files, leveraging the format’s columnar storage and compression. By default, it uses Snappy compression and writes data partitioned by the DataFrame’s structure, making it ideal for efficient storage in ETL pipelines.
Understanding write.parquet() Parameters
The write.parquet() method is part of the DataFrameWriter interface, accessed via df.write. It accepts the following key parameters:
- path (str, required): The directory path where the Parquet file(s) will be saved (e.g., "output/data.parquet"). Spark creates a directory with one or more Parquet files based on partitioning.
- mode (str, optional, default="error"): Specifies behavior if the output path exists:
- "error": Throws an error if the path exists.
- "overwrite": Overwrites the existing path.
- "append": Appends data to the existing path.
- "ignore": Silently skips writing if the path exists.
- partitionBy (str or list, optional, default=None): Partitions the output by the specified column(s), creating subdirectories (e.g., department=HR).
- compression (str, optional, default="snappy"): Compression codec for Parquet files (e.g., "snappy", "gzip", "lzo", "none").
- schema (optional, default=None): Explicitly specifies the schema, typically inferred from the DataFrame.
Here’s an example saving a DataFrame to a Parquet file with default settings:
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder.appName("SaveToParquet").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0, "HR"),
("E002", "Bob", 30, 82000.5, "IT"),
("E003", "Cathy", 28, 90000.75, "HR"),
("E004", "David", 35, 100000.25, "IT"),
("E005", "Eve", 28, 78000.0, "Finance")
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary", "department"])
# Save DataFrame to Parquet
df.write.parquet("output/employees.parquet") Output: A directory named output/employees.parquet is created, containing Parquet files (e.g., part-00000-*.parquet). The files are compressed with Snappy by default and include the DataFrame’s schema.
Validate by reading the Parquet file back:
read_df = spark.read.parquet("output/employees.parquet")
assert read_df.count() == 5, "Incorrect row count in Parquet"
assert "Alice" in [row["name"] for row in read_df.select("name").collect()], "Expected data missing" Error to Watch: Existing path with default mode fails:
try:
df.write.parquet("output/employees.parquet") # Path already exists
except Exception as e:
print(f"Error: {e}") Output:
Error: Path output/employees.parquet already exists Fix: Use mode="overwrite":
df.write.mode("overwrite").parquet("output/employees.parquet") Customizing Parquet Output with Advanced Options
To tailor the Parquet output, use write.parquet() options like compression and partitionBy. These allow you to optimize storage, improve query performance, or organize data hierarchically, which is valuable for large-scale ETL pipelines.
# Save DataFrame with custom Parquet options
df.write \
.mode("overwrite") \
.option("compression", "gzip") \
.partitionBy("department") \
.parquet("output/employees_custom.parquet") Output: A directory named output/employees_custom.parquet is created with subdirectories like department=HR, department=IT, and department=Finance, each containing Gzip-compressed Parquet files (e.g., part-00000-*.parquet.gz).
Validate:
read_df = spark.read.parquet("output/employees_custom.parquet")
assert read_df.count() == 5, "Incorrect row count in Parquet"
assert read_df.filter(col("department") == "HR").count() == 2, "Incorrect partition count for HR" The partitionBy="department" option organizes data into subdirectories, improving query performance for department-based filters. The compression="gzip" option reduces file size at the cost of increased write time. Supported codecs include "snappy", "gzip", "lzo", and "none".
Error to Watch: Invalid partition column fails:
try:
df.write.partitionBy("invalid_column").parquet("output/employees.parquet")
except Exception as e:
print(f"Error: {e}") Output:
Error: Column 'invalid_column' does not exist Fix: Verify partition column:
assert "department" in df.columns, "Partition column missing" Saving Nested Data to a Parquet File
Nested DataFrames, with structs or arrays, are natively supported by Parquet’s columnar format, making it ideal for storing complex data without flattening. You can save nested structures directly, preserving their hierarchy, which is crucial for ETL pipelines handling structured data like employee contact details.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
spark = SparkSession.builder.appName("NestedSaveToParquet").getOrCreate()
# Define schema with nested structs
schema = StructType([
StructField("employee_id", StringType(), False),
StructField("name", StringType(), True),
StructField("contact", StructType([
StructField("phone", LongType(), True),
StructField("email", StringType(), True)
]), True),
StructField("department", StringType(), True)
])
# Create DataFrame
data = [
("E001", "Alice", (1234567890, "alice@company.com"), "HR"),
("E002", "Bob", (None, "bob@company.com"), "IT"),
("E003", "Cathy", (5555555555, "cathy@company.com"), "HR"),
("E004", "David", (9876543210, "david@company.com"), "IT")
]
df = spark.createDataFrame(data, schema)
# Save nested DataFrame to Parquet
df.write.mode("overwrite").parquet("output/employees_nested.parquet") Output: A directory named output/employees_nested.parquet is created, containing Parquet files with the nested contact struct preserved.
Validate:
read_df = spark.read.parquet("output/employees_nested.parquet")
assert read_df.count() == 4, "Incorrect row count in Parquet"
assert read_df.filter(col("contact.email") == "alice@company.com").count() == 1, "Expected nested data missing"
read_df.printSchema() Schema Output:
root
|-- employee_id: string (nullable = false)
|-- name: string (nullable = true)
|-- contact: struct (nullable = true)
| |-- phone: long (nullable = true)
| |-- email: string (nullable = true)
|-- department: string (nullable = true) This confirms the nested structure is preserved. Parquet’s columnar format efficiently stores and queries nested data, unlike CSV, which requires flattening.
Error to Watch: Incompatible schema changes fail on append:
try:
# Simulate schema change
altered_df = df.withColumn("new_field", col("name"))
altered_df.write.mode("append").parquet("output/employees_nested.parquet")
except Exception as e:
print(f"Error: {e}") Output:
Error: Schema mismatch Fix: Ensure schema consistency or use overwrite:
assert altered_df.schema == df.schema, "Schema mismatch, use overwrite or align schemas" Saving to Parquet Using SQL Queries
For SQL-based ETL workflows or teams familiar with database querying, SQL queries via temporary views offer an alternative way to save a DataFrame to Parquet. The SELECT statement prepares the data, and write.parquet() saves the result.
# Create temporary view
df.createOrReplaceTempView("employees")
# Save to Parquet using SQL
sql_df = spark.sql("SELECT employee_id, name, age, salary, department FROM employees")
sql_df.write.mode("overwrite").parquet("output/employees_sql.parquet") Output: A directory named output/employees_sql.parquet is created, containing Parquet files.
Validate:
read_df = spark.read.parquet("output/employees_sql.parquet")
assert read_df.count() == 5, "Incorrect row count in Parquet"
assert "Bob" in [row["name"] for row in read_df.select("name").collect()], "Expected name missing" Error to Watch: Unregistered view fails:
try:
sql_df = spark.sql("SELECT * FROM nonexistent")
sql_df.write.parquet("output/invalid.parquet")
except Exception as e:
print(f"Error: {e}") Output:
Error: Table or view not found: nonexistent Fix: Verify view:
assert "employees" in [v.name for v in spark.catalog.listTables()], "View missing"
df.createOrReplaceTempView("employees") Optimizing Performance for Saving to Parquet
Saving a DataFrame to Parquet involves distributed writing across executors, which can be I/O-intensive for large datasets. Parquet’s columnar format and compression reduce storage needs, but optimization ensures efficient export:
- Select Relevant Columns: Minimize data written:
df = df.select("employee_id", "name", "salary") - Filter Rows: Reduce rows before writing:
df = df.filter(col("salary") > 0) - Repartition Data: Control output file count and partitioning:
df = df.repartition(4) # Balance file size and parallelism - Use Efficient Compression: Choose "snappy" for speed or "gzip" for size:
df.write.option("compression", "snappy").parquet("output/employees.parquet") - Partition Strategically: Use partitionBy for query performance:
df.write.partitionBy("department").parquet("output/employees.parquet") Example optimized save:
optimized_df = df.select("employee_id", "name", "salary").filter(col("salary") > 0)
optimized_df.repartition(4).write.mode("overwrite").option("compression", "snappy").parquet("output/employees_optimized.parquet") Monitor I/O and executor performance via the Spark UI to identify bottlenecks.
Error to Watch: Excessive partitions create many small files:
# Example with many partitions
df.repartition(1000).write.parquet("output/employees_many_files.parquet") # Creates many small files Fix: Adjust partitions:
assert df.rdd.getNumPartitions() <= 10, "Too many partitions, consider repartition" Wrapping Up Your Parquet Saving Mastery
Saving a PySpark DataFrame to a Parquet file is a vital skill for efficient data storage and querying in ETL pipelines. Whether you’re using write.parquet() with default or customized options, preserving nested data structures, or leveraging SQL queries for intuitive exports, Spark provides powerful tools to address diverse data storage needs. By mastering these techniques, optimizing performance, and anticipating errors, you can create compact, query-optimized datasets that integrate seamlessly with big data ecosystems. These methods will enhance your data engineering workflows, empowering you to manage data storage with confidence.
Try these approaches in your next Spark job, and share your experiences, tips, or questions in the comments or on X. Keep exploring with DataFrame Operations to deepen your PySpark expertise!
More Spark Resources to Keep You Going
- Apache Spark Documentation
- Databricks Spark Guide
- PySpark DataFrame Basics
- PySpark Performance Tuning