How to Filter Rows Based on a Condition in a PySpark DataFrame: The Ultimate Guide
Published on April 17, 2025
Diving Straight into Filtering Rows in a PySpark DataFrame
Need to filter rows in a PySpark DataFrame—like selecting high-value customers or recent transactions—to focus your analysis or streamline an ETL pipeline? Filtering rows based on a condition is a core skill for data engineers working with Apache Spark. It allows you to extract relevant data subsets efficiently, reducing processing overhead. This guide dives into the syntax and steps for filtering rows in a PySpark DataFrame using conditions, with examples covering simple, complex, regex-based, and nested scenarios. We’ll tackle key errors to keep your pipelines robust. Let’s refine that data! For more on PySpark, see Introduction to PySpark.
Filtering Rows Based on a Condition
The primary method for filtering rows in a PySpark DataFrame is the filter() or where() method (interchangeable), which creates a new DataFrame containing only rows meeting the specified condition. Conditions can involve comparisons, logical operators, or functions applied to columns. The SparkSession, Spark’s unified entry point, supports these operations on distributed datasets. This approach is ideal for ETL pipelines needing to isolate specific data. Here’s the basic syntax:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilterRows").getOrCreate()
df = spark.createDataFrame(data, schema)
filtered_df = df.filter(col("column_name") > value)
Let’s apply it to an employee DataFrame with IDs, names, ages, salaries, and departments, filtering employees with salaries above 80,000:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession
spark = SparkSession.builder.appName("FilterRows").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0, "HR"),
("E002", "Bob", 30, 82000.5, "IT"),
("E003", "Cathy", 28, 90000.75, "HR"),
("E004", "David", 35, 100000.25, "IT")
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary", "department"])
# Filter rows
filtered_df = df.filter(col("salary") > 80000)
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+---------+----------+
|employee_id|name |age|salary |department|
+-----------+-----+---+---------+----------+
|E002 |Bob |30 |82000.5 |IT |
|E003 |Cathy|28 |90000.75 |HR |
|E004 |David|35 |100000.25|IT |
+-----------+-----+---+---------+----------+
This creates a new DataFrame with rows where salary exceeds 80,000. Validate: assert filtered_df.count() == 3, "Unexpected row count". For SparkSession details, see SparkSession in PySpark.
Filtering Rows with a Simple Condition
Filtering rows based on a single condition, like selecting employees older than a certain age, is the most common use case for isolating data subsets in ETL tasks, such as preparing data for analysis, as seen in ETL Pipelines. The filter() method with col() is straightforward:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("SimpleFilter").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0),
("E002", "Bob", 30, 82000.5),
("E003", "Cathy", 28, 90000.75)
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary"])
# Filter rows
filtered_df = df.filter(col("age") > 27)
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+--------+
|employee_id|name |age|salary |
+-----------+-----+---+--------+
|E002 |Bob |30 |82000.5 |
|E003 |Cathy|28 |90000.75|
+-----------+-----+---+--------+
This filters employees older than 27. Error to Watch: Invalid column fails:
try:
filtered_df = df.filter(col("invalid_column") > 27)
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Column 'invalid_column' does not exist
Fix: Verify column: assert "age" in df.columns, "Column missing". Check output: assert filtered_df.count() > 0, "No rows filtered".
Filtering Rows with Complex Conditions
Filtering rows based on multiple or complex conditions, like combining salary and department criteria, extends simple filtering for precise data extraction in ETL pipelines, as discussed in DataFrame Operations. Use logical operators (&, |, ~):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("ComplexFilter").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0, "HR"),
("E002", "Bob", 30, 82000.5, "IT"),
("E003", "Cathy", 28, 90000.75, "HR"),
("E004", "David", 35, 100000.25, "IT")
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary", "department"])
# Filter with complex condition
filtered_df = df.filter((col("salary") > 80000) & (col("department") == "IT"))
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+---------+----------+
|employee_id|name |age|salary |department|
+-----------+-----+---+---------+----------+
|E002 |Bob |30 |82000.5 |IT |
|E004 |David|35 |100000.25|IT |
+-----------+-----+---+---------+----------+
This filters employees with salaries above 80,000 in the IT department. Validate: assert filtered_df.select("department").distinct().count() == 1, "Unexpected departments".
Filtering Rows Using Regex Patterns
Filtering rows based on regex patterns, like matching names or IDs with specific formats, extends complex conditions for dynamic ETL filtering, as discussed in DataFrame Operations. Use rlike() for regex matching:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("RegexFilter").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0),
("E002", "Bob", 30, 82000.5),
("E003", "Cathy", 28, 90000.75),
("E004", "David", 35, 100000.25)
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary"])
# Filter rows with regex
filtered_df = df.filter(col("name").rlike("^[AB].*"))
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+--------+
|employee_id|name |age|salary |
+-----------+-----+---+--------+
|E001 |Alice|25 |75000.0 |
|E002 |Bob |30 |82000.5 |
+-----------+-----+---+--------+
This filters names starting with A or B. Error to Watch: Invalid regex fails:
try:
filtered_df = df.filter(col("name").rlike("[A-B")) # Unclosed bracket
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Invalid regex pattern
Fix: Test regex: import re; re.compile("^[AB].*"). Validate: assert filtered_df.count() > 0, "No rows matched".
Filtering Nested Data
Nested DataFrames, with structs or arrays, model complex relationships, like employee contact details. Filtering based on nested fields extends regex filtering for advanced ETL data extraction, as discussed in DataFrame UDFs:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("NestedFilter").getOrCreate()
# Define schema with nested structs
schema = StructType([
StructField("employee_id", StringType(), False),
StructField("name", StringType(), True),
StructField("contact", StructType([
StructField("phone", LongType(), True),
StructField("email", StringType(), True)
]), True)
])
# Create DataFrame
data = [
("E001", "Alice", (1234567890, "alice}example.com")),
("E002", "Bob", (9876543210, "bob}example.com")),
("E003", "Cathy", (None, None))
]
df = spark.createDataFrame(data, schema)
# Filter nested field
filtered_df = df.filter(col("contact.email").isNotNull())
filtered_df.show(truncate=False)
Output:
+-----------+-----+--------------------------------+
|employee_id|name |contact |
+-----------+-----+--------------------------------+
|E001 |Alice|[1234567890, alice}example.com] |
|E002 |Bob |[9876543210, bob}example.com] |
+-----------+-----+--------------------------------+
This filters rows with non-null email addresses. Error to Watch: Invalid nested field fails:
try:
filtered_df = df.filter(col("contact.invalid_field").isNotNull())
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: StructField 'contact' does not contain field 'invalid_field'
Fix: Validate nested field: assert "email" in [f.name for f in df.schema["contact"].dataType.fields], "Nested field missing".
Filtering Rows Using SQL Queries
Using a SQL query via a temporary view to filter rows provides an alternative approach, extending nested filtering for SQL-based ETL workflows, as seen in DataFrame Operations:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SQLFilter").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0, "HR"),
("E002", "Bob", 30, 82000.5, "IT"),
("E003", "Cathy", 28, 90000.75, "HR")
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary", "department"])
# Create temporary view
df.createOrReplaceTempView("employees")
# Filter using SQL
filtered_df = spark.sql("SELECT * FROM employees WHERE salary > 80000 AND department = 'HR'")
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+---------+----------+
|employee_id|name |age|salary |department|
+-----------+-----+---+---------+----------+
|E003 |Cathy|28 |90000.75 |HR |
+-----------+-----+---+---------+----------+
This filters rows using SQL, ideal for SQL-driven pipelines. Validate view: assert "employees" in [v.name for v in spark.catalog.listTables()], "View missing".
How to Fix Common Filtering Errors
Errors can disrupt row filtering. Here are key issues, with fixes:
- Non-Existent Column: Filtering on invalid columns fails. Fix: assert column in df.columns, "Column missing".
- Invalid Regex Pattern: Incorrect regex fails. Fix: Test: import re; re.compile(pattern). Ensure valid pattern syntax.
- Non-Existent View: SQL on unregistered views fails. Fix: assert view_name in [v.name for v in spark.catalog.listTables()], "View missing". Register: df.createOrReplaceTempView(view_name).
For more, see Error Handling and Debugging.
Wrapping Up Your Row Filtering Mastery
Filtering rows based on a condition in a PySpark DataFrame is a vital skill, and Spark’s filter(), where(), regex patterns, and SQL queries make it easy to handle simple, complex, regex-based, nested, and SQL-based scenarios. These techniques will level up your ETL pipelines. Try them in your next Spark job, and share tips or questions in the comments or on X. Keep exploring with DataFrame Operations!