How to Filter Rows Using a Regular Expression in a PySpark DataFrame: The Ultimate Guide
Published on April 17, 2025
Diving Straight into Filtering Rows with Regular Expressions in a PySpark DataFrame
Filtering rows in a PySpark DataFrame using a regular expression (regex) is a powerful technique for data engineers using Apache Spark, enabling precise pattern-based data extraction. Whether you're searching for names matching a specific format, identifying records with complex string patterns, or refining datasets for analysis, regex filtering is essential for advanced data processing in ETL pipelines. This comprehensive guide explores the syntax and steps for filtering rows using regex, with examples covering basic regex filtering, combining with other conditions, nested data, and SQL-based approaches. Each section addresses a specific aspect of regex filtering, supported by practical code, error handling, and performance optimization strategies to build robust pipelines. The primary method, filter() with rlike(), is explained with all relevant considerations. Let’s match those patterns! For more on PySpark, see PySpark Fundamentals.
Filtering Rows with a Regular Expression
The primary method for filtering rows in a PySpark DataFrame is the filter() method (or its alias where()), combined with the rlike() function to check if a column’s string values match a regular expression pattern. This approach is ideal for ETL pipelines needing to select records based on complex string patterns, such as names starting with a specific letter or emails with a particular domain.
Understanding filter(), where(), and rlike() Parameters
- filter(condition) or where(condition):
- condition (Column or str, required): A boolean expression defining the filtering criteria, such as col("column").rlike(pattern) or a SQL-like string (e.g., "column RLIKE pattern").
- Returns: A new DataFrame containing only the rows where the condition evaluates to True.
- Note: filter() and where() are interchangeable, with where() offering a SQL-like syntax for readability.
- rlike(pattern) (Column method, from pyspark.sql.functions):
- pattern (str, required): The regular expression pattern to match against the column’s string values, using Java regex syntax (e.g., "^A.*" for strings starting with "A").
- Returns: A Column expression evaluating to True if the column’s value matches the pattern, False otherwise.
- Note: rlike() is case-sensitive and returns False for null values. It supports standard regex constructs like ^ (start), $ (end), .* (any characters), and [A-Za-z] (character ranges).
Here’s an example filtering employees whose names start with "A" or "B":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession
spark = SparkSession.builder.appName("RegexFilter").getOrCreate()
# Create DataFrame
data = [
("E001", "Alice", 25, 75000.0, "HR"),
("E002", "Bob", 30, 82000.5, "IT"),
("E003", "Cathy", 28, 90000.75, "HR"),
("E004", "David", 35, 100000.25, "IT"),
("E005", "Eve", 28, 78000.0, "Finance")
]
df = spark.createDataFrame(data, ["employee_id", "name", "age", "salary", "department"])
# Filter rows where name starts with "A" or "B"
filtered_df = df.filter(col("name").rlike("^[AB].*"))
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+-------+----------+
|employee_id|name |age|salary |department|
+-----------+-----+---+-------+----------+
|E001 |Alice|25 |75000.0|HR |
|E002 |Bob |30 |82000.5|IT |
+-----------+-----+---+-------+----------+
This filters rows where name starts with "A" or "B" using the regex pattern ^[AB]., returning two rows (E001, E002). The pattern ^ denotes the start of the string, [AB] matches "A" or "B", and . matches any following characters. Validate:
assert filtered_df.count() == 2, "Incorrect row count"
assert "Alice" in [row["name"] for row in filtered_df.select("name").collect()], "Expected name missing"
Error to Watch: Invalid regex pattern fails:
try:
filtered_df = df.filter(col("name").rlike("[A-B")) # Unclosed bracket
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Invalid regex pattern
Fix: Validate regex:
import re
try:
re.compile("^[AB].*")
except re.error:
raise ValueError("Invalid regex pattern")
Combining Regex Filtering with Other Conditions
To filter rows based on a regex pattern alongside other conditions, combine rlike() with additional criteria using logical operators (& for AND, | for OR, ~ for NOT). This is useful for complex ETL filtering, such as selecting records with specific name patterns and salary thresholds.
from pyspark.sql.functions import col
# Filter rows where name starts with "A" or "B" and salary > 80000
filtered_df = df.filter((col("name").rlike("^[AB].*")) & (col("salary") > 80000))
filtered_df.show(truncate=False)
Output:
+-----------+----+---+-------+----------+
|employee_id|name|age|salary |department|
+-----------+----+---+-------+----------+
|E002 |Bob |30 |82000.5|IT |
+-----------+----+---+-------+----------+
This filters rows where name starts with "A" or "B" and salary exceeds 80,000, returning one row (E002). Validate:
assert filtered_df.count() == 1, "Incorrect row count"
assert "Bob" in [row["name"] for row in filtered_df.select("name").collect()], "Expected name missing"
Error to Watch: Null values in regex filtering return False:
# Example with nulls
data_with_nulls = data + [("E006", None, 32, 85000.0, "IT")]
df_nulls = spark.createDataFrame(data_with_nulls, ["employee_id", "name", "age", "salary", "department"])
filtered_df = df_nulls.filter(col("name").rlike("^[AB].*"))
# Excludes E006 due to null name
Fix: Handle nulls explicitly:
filtered_df = df_nulls.filter(col("name").rlike("^[AB].*") & col("name").isNotNull())
assert filtered_df.count() == 2, "Nulls not handled correctly"
Filtering Nested Data with Regular Expressions
Nested DataFrames, with structs or arrays, are common in complex datasets like employee contact details. Filtering rows where a nested field, such as contact.email, matches a regex pattern requires dot notation (e.g., contact.email) with rlike(). This is crucial for hierarchical data in ETL pipelines.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("NestedRegexFilter").getOrCreate()
# Define schema with nested structs
schema = StructType([
StructField("employee_id", StringType(), False),
StructField("name", StringType(), True),
StructField("contact", StructType([
StructField("phone", LongType(), True),
StructField("email", StringType(), True)
]), True),
StructField("department", StringType(), True)
])
# Create DataFrame
data = [
("E001", "Alice", (1234567890, "alice@company.com"), "HR"),
("E002", "Bob", (None, "bob@company.com"), "IT"),
("E003", "Cathy", (5555555555, "cathy@other.com"), "HR"),
("E004", "David", (9876543210, "david@company.com"), "IT")
]
df = spark.createDataFrame(data, schema)
# Filter rows where contact.email ends with "company.com"
filtered_df = df.filter(col("contact.email").rlike(".*@company\\.com$"))
filtered_df.show(truncate=False)
Output:
+-----------+-----+--------------------------------+----------+
|employee_id|name |contact |department|
+-----------+-----+--------------------------------+----------+
|E001 |Alice|[1234567890, alice@company.com] |HR |
|E002 |Bob |[null, bob@company.com] |IT |
|E004 |David|[9876543210, david@company.com] |IT |
+-----------+-----+--------------------------------+----------+
This filters rows where contact.email ends with "company.com" using the regex .@company\.com$, returning three rows (E001, E002, E004). The pattern . matches any characters, @company\.com matches the literal domain (escaping the dot), and $ denotes the string’s end. Validate:
assert filtered_df.count() == 3, "Incorrect row count"
assert "alice@company.com" in [row["contact"]["email"] for row in filtered_df.collect()], "Expected email missing"
Error to Watch: Invalid nested field fails:
try:
filtered_df = df.filter(col("contact.invalid_field").rlike(".*@company\\.com$"))
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: StructField 'contact' does not contain field 'invalid_field'
Fix: Validate nested field:
assert "email" in [f.name for f in df.schema["contact"].dataType.fields], "Nested field missing"
Filtering Using SQL Queries
For SQL-based ETL workflows or teams familiar with database querying, SQL queries via temporary views offer an intuitive way to filter rows using regular expressions. The RLIKE operator in Spark SQL provides regex-based filtering, similar to rlike() in DataFrame API.
# Create temporary view
df.createOrReplaceTempView("employees")
# Filter rows where name starts with "A" or "B" using SQL
filtered_df = spark.sql("SELECT * FROM employees WHERE name RLIKE '^[AB].*'")
filtered_df.show(truncate=False)
Output:
+-----------+-----+---+-------+----------+
|employee_id|name |age|salary |department|
+-----------+-----+---+-------+----------+
|E001 |Alice|25 |75000.0|HR |
|E002 |Bob |30 |82000.5|IT |
+-----------+-----+---+-------+----------+
This filters rows where name starts with "A" or "B" using SQL’s RLIKE operator. Validate:
assert filtered_df.count() == 2, "Incorrect row count"
assert "Bob" in [row["name"] for row in filtered_df.select("name").collect()], "Expected name missing"
Error to Watch: Unregistered view fails:
try:
filtered_df = spark.sql("SELECT * FROM nonexistent WHERE name RLIKE '^[AB].*'")
filtered_df.show()
except Exception as e:
print(f"Error: {e}")
Output:
Error: Table or view not found: nonexistent
Fix: Verify view:
assert "employees" in [v.name for v in spark.catalog.listTables()], "View missing"
df.createOrReplaceTempView("employees")
Optimizing Performance for Regex Filtering
Filtering rows with regular expressions involves scanning the DataFrame and evaluating complex string operations, which can be computationally expensive for large datasets. Optimize performance to ensure efficient data extraction:
- Select Relevant Columns: Reduce data scanned:
df = df.select("employee_id", "name", "department")
- Push Down Filters: Apply filters early:
df = df.filter(col("name").rlike("^[AB].*"))
- Partition Data: Use partitionBy or repartition:
df = df.repartition("department")
- Cache Intermediate Results: Cache filtered DataFrame if reused:
filtered_df.cache()
Example optimized filter:
optimized_df = df.select("employee_id", "name", "department") \
.filter((col("name").rlike("^[AB].*")) & (col("name").isNotNull())) \
.repartition("department")
optimized_df.show(truncate=False)
Monitor performance via the Spark UI, focusing on scan and filter metrics.
Error to Watch: Complex regex patterns on large datasets slow performance:
# Example with complex regex and large DataFrame
large_df = spark.range(10000000).join(df, "employee_id", "left")
filtered_df = large_df.filter(col("name").rlike("^[A-Z][a-z]+.*")) # Inefficient
Fix: Optimize with early filtering and simpler patterns:
assert large_df.count() < 10000000, "Large dataset, optimize with early filters or simpler regex"
Wrapping Up Your Regex Filtering Mastery
Filtering rows using regular expressions in a PySpark DataFrame is a vital skill for precise pattern-based data extraction in ETL pipelines. Whether you’re using filter() with rlike() for regex matches, combining with other conditions, handling nested data with dot notation, or leveraging SQL queries with RLIKE, Spark provides powerful tools to address complex data processing needs. By mastering these techniques, optimizing performance, and anticipating errors, you can efficiently refine datasets, enabling accurate analyses and robust applications. These methods will enhance your data engineering workflows, empowering you to manage regex-based filtering with confidence.
Try these approaches in your next Spark job, and share your experiences, tips, or questions in the comments or on X. Keep exploring with DataFrame Operations to deepen your PySpark expertise!