How to Create a PySpark DataFrame from a CSV File: The Ultimate Guide
Published on April 17, 2025
Diving Straight into Creating PySpark DataFrames from CSV Files
Got a CSV file—say, employee data with IDs, names, and salaries—ready to scale up for big data analytics? Creating a PySpark DataFrame from a CSV file is a must-have skill for any data engineer building ETL pipelines with Apache Spark’s distributed power. This guide jumps right into the syntax and practical steps for creating a PySpark DataFrame from a CSV file, packed with examples showing how to handle different scenarios, from simple to complex. We’ll tackle common errors to keep your pipelines rock-solid. Let’s load that data like a pro! For a broader introduction to PySpark, check out Introduction to PySpark.
How to Create a PySpark DataFrame from a CSV File
The primary method for creating a PySpark DataFrame from a CSV file is the read.csv method of the SparkSession. This unified entry point, which encapsulates the older Spark Context for RDD operations, allows you to load a CSV file into a distributed DataFrame, with options to infer the schema or define a custom schema for type control. Here’s the basic syntax:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CreateDataFrameFromCSV").getOrCreate()
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
It’s like turning your CSV file into a distributed table ready for Spark’s magic. Let’s try it with an employee CSV file, a common ETL scenario, with columns for employee IDs, names, ages, and salaries. Assume employees.csv contains:
employee_id,name,age,salary
E001,Alice,25,75000.00
E002,Bob,30,82000.50
E003,Cathy,28,90000.75
E004,David,35,100000.25
Here’s the code to load it:
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder.appName("CreateDataFrameFromCSV").getOrCreate()
# Load CSV file
df = spark.read.csv("employees.csv", header=True, inferSchema=True)
df.show(truncate=False)
df.printSchema()
Output:
+-----------+-----+---+---------+
|employee_id|name |age|salary |
+-----------+-----+---+---------+
|E001 |Alice|25 |75000.0 |
|E002 |Bob |30 |82000.5 |
|E003 |Cathy|28 |90000.75 |
|E004 |David|35 |100000.25|
+-----------+-----+---+---------+
root
|-- employee_id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
This creates a DataFrame ready for Spark operations, like a SQL table, ideal for ETL pipelines. The header=True option uses the first row as column names, and inferSchema=True detects data types. Check out Show Operation for display tips. A common error is a malformed CSV file, like missing columns. Validate the file structure: with open("employees.csv") as f: assert all(len(line.split(",")) == 4 for line in f.readlines()[1:]), "Inconsistent CSV columns". For more on SparkSession, see SparkSession in PySpark.
How to Create a DataFrame from a Simple CSV File
A simple CSV file has uniform columns with basic types like strings, integers, and floats, perfect for straightforward ETL tasks like those in ETL Pipelines. Using the same employees.csv:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SimpleCSV").getOrCreate()
df_simple = spark.read.csv("employees.csv", header=True, inferSchema=True)
df_simple.show(truncate=False)
Output:
+-----------+-----+---+---------+
|employee_id|name |age|salary |
+-----------+-----+---+---------+
|E001 |Alice|25 |75000.0 |
|E002 |Bob |30 |82000.5 |
|E003 |Cathy|28 |90000.75 |
|E004 |David|35 |100000.25|
+-----------+-----+---+---------+
Error to Watch: Malformed CSV rows cause errors:
employee_id,name,age,salary
E001,Alice,25,75000.00
E002,Bob,30 # Missing salary
E003,Cathy,28,90000.75
try:
df_simple_invalid = spark.read.csv("employees_invalid.csv", header=True, inferSchema=True)
df_simple_invalid.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Malformed CSV record
Fix: Use mode="PERMISSIVE" to load malformed rows with nulls or clean the CSV: df_simple_invalid = spark.read.option("mode", "PERMISSIVE").csv("employees_invalid.csv", header=True, inferSchema=True). Validate: with open("employees.csv") as f: assert all(len(line.split(",")) == 4 for line in f.readlines()[1:]), "Inconsistent CSV columns".
How to Create a DataFrame from a CSV File with Null Values
CSV files often have null values, like missing names or salaries, represented as empty strings or null. Spark handles these, as seen in Column Null Handling. Assume employees_nulls.csv:
employee_id,name,age,salary
E001,Alice,25,75000.00
E002,,30,82000.50
E003,Cathy,28,
E004,,35,100000.25
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NullCSV").getOrCreate()
df_nulls = spark.read.csv("employees_nulls.csv", header=True, inferSchema=True)
df_nulls.show(truncate=False)
Output:
+-----------+-----+----+---------+
|employee_id|name |age |salary |
+-----------+-----+----+---------+
|E001 |Alice|25 |75000.0 |
|E002 |null |30 |82000.5 |
|E003 |Cathy|28 |null |
|E004 |null |35 |100000.25|
+-----------+-----+----+---------+
Error to Watch: Nulls in non-nullable fields with a custom schema fail:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema_strict = StructType([
StructField("employee_id", StringType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("salary", DoubleType(), True)
])
try:
df_nulls_strict = spark.read.csv("employees_nulls.csv", header=True, schema=schema_strict)
df_nulls_strict.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: field employee_id: StringType() can not accept object None in type
Fix: Use nullable fields or clean data: df_nulls = spark.read.option("nullValue", "").csv("employees_nulls.csv", header=True, inferSchema=True). Validate: df_nulls.select([col(c).isNull().cast("int").alias(c) for c in df_nulls.columns]).agg(*[sum(col(c)).alias(c) for c in df_nulls.columns]).show().
How to Create a DataFrame from a CSV File with a Custom Schema
Inferring schemas can be risky in production. A custom schema ensures type safety, as covered in Schema Operations:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder.appName("CustomSchemaCSV").getOrCreate()
schema_custom = StructType([
StructField("employee_id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("salary", DoubleType(), True)
])
df_custom = spark.read.csv("employees.csv", header=True, schema=schema_custom)
df_custom.show(truncate=False)
df_custom.printSchema()
Output:
+-----------+-----+---+---------+
|employee_id|name |age|salary |
+-----------+-----+---+---------+
|E001 |Alice|25 |75000.0 |
|E002 |Bob |30 |82000.5 |
|E003 |Cathy|28 |90000.75 |
|E004 |David|35 |100000.25|
+-----------+-----+---+---------+
root
|-- employee_id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)
Error to Watch: Data-schema mismatches fail:
employee_id,name,age,salary
E001,Alice,twenty-five,75000.00 # Invalid age
E002,Bob,30,82000.50
try:
df_custom_invalid = spark.read.csv("employees_invalid_age.csv", header=True, schema=schema_custom)
df_custom_invalid.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: field age: IntegerType can not accept object 'twenty-five' in type
Fix: Clean data or use mode="PERMISSIVE": df_custom_invalid = spark.read.option("mode", "PERMISSIVE").csv("employees_invalid_age.csv", header=True, schema=schema_custom). Validate: with open("employees.csv") as f: lines = f.readlines()[1:]; assert all(line.split(",")[2].isdigit() for line in lines), "Invalid age data".
How to Create a DataFrame from a CSV File with Custom Delimiters
CSV files may use delimiters other than commas, like tabs or semicolons, common in diverse data sources. Use the delimiter option, as seen in Data Sources Read CSV. Assume employees_semicolon.csv:
employee_id;name;age;salary
E001;Alice;25;75000.00
E002;Bob;30;82000.50
E003;Cathy;28;90000.75
E004;David;35;100000.25
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DelimiterCSV").getOrCreate()
df_delimiter = spark.read.option("delimiter", ";").csv("employees_semicolon.csv", header=True, inferSchema=True)
df_delimiter.show(truncate=False)
Output:
+-----------+-----+---+---------+
|employee_id|name |age|salary |
+-----------+-----+---+---------+
|E001 |Alice|25 |75000.0 |
|E002 |Bob |30 |82000.5 |
|E003 |Cathy|28 |90000.75 |
|E004 |David|35 |100000.25|
+-----------+-----+---+---------+
Error to Watch: Incorrect delimiter causes malformed rows:
try:
df_delimiter_invalid = spark.read.csv("employees_semicolon.csv", header=True, inferSchema=True) # Default comma
df_delimiter_invalid.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Malformed CSV record
Fix: Specify the correct delimiter: option("delimiter", ";"). Validate: with open("employees_semicolon.csv") as f: assert all(line.count(";") == 3 for line in f.readlines()[1:]), "Invalid delimiter".
How to Create a DataFrame from a CSV File with Timestamps
CSV files with timestamps, like hire dates, are key in analytics, especially for time-series tasks like Time Series Analysis. Assume employees_dates.csv:
employee_id,name,hire_date
E001,Alice,2023-01-15
E002,Bob,2022-06-30
E003,Cathy,
E004,David,2021-09-01
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
spark = SparkSession.builder.appName("TimestampCSV").getOrCreate()
schema_dates = StructType([
StructField("employee_id", StringType(), True),
StructField("name", StringType(), True),
StructField("hire_date", TimestampType(), True)
])
df_dates = spark.read.csv("employees_dates.csv", header=True, schema=schema_dates)
df_dates.show(truncate=False)
Output:
+-----------+-----+--------------------+
|employee_id|name |hire_date |
+-----------+-----+--------------------+
|E001 |Alice|2023-01-15 00:00:00 |
|E002 |Bob |2022-06-30 00:00:00 |
|E003 |Cathy|null |
|E004 |David|2021-09-01 00:00:00 |
+-----------+-----+--------------------+
Error to Watch: Invalid timestamp formats fail:
employee_id,name,hire_date
E001,Alice,01-15-2023 # Wrong format
E002,Bob,2022-06-30
try:
df_dates_invalid = spark.read.csv("employees_dates_invalid.csv", header=True, schema=schema_dates)
df_dates_invalid.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: field hire_date: TimestampType can not accept object '01-15-2023' in type
Fix: Use dateFormat: df_dates_invalid = spark.read.option("dateFormat", "MM-dd-yyyy").csv("employees_dates_invalid.csv", header=True, schema=schema_dates). Validate: with open("employees_dates.csv") as f: from datetime import datetime; assert all(not line.split(",")[2].strip() or datetime.strptime(line.split(",")[2].strip(), "%Y-%m-%d") for line in f.readlines()[1:]), "Invalid date format". For dates, see Datetime Operations.
How to Create a DataFrame from a CSV File with Complex Nested Structures
Complex nested structures, like JSON-like columns for employee skills, require nested schemas, as seen in DataFrame UDFs. Assume employees_complex.csv with a JSON string column:
employee_id,name,skills
E001,Alice,"[{'year': 2023, 'certification': 'Python'}, {'year': 2024, 'certification': 'Spark'}]"
E002,Bob,"[{'year': 2022, 'certification': 'Java'}]"
E003,Cathy,"[]"
E004,David,"[{'year': 2021, 'certification': 'Scala'}, {'year': 2023, 'certification': 'AWS'}]"
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
import json
spark = SparkSession.builder.appName("ComplexCSV").getOrCreate()
schema_complex = StructType([
StructField("employee_id", StringType(), True),
StructField("name", StringType(), True),
StructField("skills", ArrayType(StructType([
StructField("year", IntegerType(), True),
StructField("certification", StringType(), True)
])), True)
])
# Load CSV and parse JSON column
df_complex = spark.read.csv("employees_complex.csv", header=True).withColumn("skills", from_json(col("skills"), ArrayType(StructType([
StructField("year", IntegerType(), True),
StructField("certification", StringType(), True)
]))))
df_complex.show(truncate=False)
Output:
+-----------+-----+---------------------------------------+
|employee_id|name |skills |
+-----------+-----+---------------------------------------+
|E001 |Alice|[{2023, Python}, {2024, Spark}] |
|E002 |Bob |[{2022, Java}] |
|E003 |Cathy|[] |
|E004 |David|[{2021, Scala}, {2023, AWS}] |
+-----------+-----+---------------------------------------+
Error to Watch: Invalid JSON formats fail:
employee_id,name,skills
E001,Alice,"[{'year': 2023, 'certification': 'Python'}]"
E002,Bob,"[{year: 2022}]" # Invalid JSON
try:
df_complex_invalid = spark.read.csv("employees_complex_invalid.csv", header=True).withColumn("skills", from_json(col("skills"), ArrayType(StructType([
StructField("year", IntegerType(), True),
StructField("certification", StringType(), True)
]))))
df_complex_invalid.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Malformed JSON in column 'skills'
Fix: Clean JSON: with open("employees_complex_invalid.csv", "r") as f: lines = f.readlines(); data_clean = [line if json.loads(line.split(",")[2].strip()) else line.replace(line.split(",")[2], "[]") for line in lines[1:]]. Validate: with open("employees_complex.csv") as f: import json; assert all(json.loads(line.split(",")[2].strip()) for line in f.readlines()[1:]), "Invalid JSON".
How to Fix Common DataFrame Creation Errors
Errors can derail CSV-to-PySpark DataFrame creation. Here are three key issues from the scenarios above, with fixes:
Malformed CSV Rows: Inconsistent column counts cause errors. Fix with option("mode", "PERMISSIVE"). Validate: with open("employees.csv") as f: assert all(len(line.split(",")) == 4 for line in f.readlines()[1:]), "Inconsistent CSV columns". Check: df.count().
Data-Schema Mismatch: Invalid data types, like strings in IntegerType, fail. Fix with data_clean = [(r[0], r[1], int(r[2]) if isinstance(r[2], str) else r[2], r[3]) for r in data]. Validate: with open("employees.csv") as f: assert all(line.split(",")[2].isdigit() for line in f.readlines()[1:]), "Invalid data". Check schema: df.printSchema().
Invalid Nested Structures: Malformed JSON or nested data fails. Fix with df = spark.read.csv("file.csv", header=True).withColumn("skills", from_json(col("skills"), schema)). Validate: with open("employees_complex.csv") as f: import json; assert all(json.loads(line.split(",")[2].strip()) for line in f.readlines()[1:]), "Invalid JSON".
For more, see Error Handling and Debugging.
Wrapping Up Your DataFrame Creation Mastery
Creating a PySpark DataFrame from a CSV file is a vital skill, and PySpark’s read.csv method makes it easy to handle everything from simple to complex scenarios. These techniques will level up your ETL pipelines. Try them in your next Spark job, and share tips or questions in the comments or on X. Keep exploring with DataFrame Operations!