How to Filter Rows Where a Column Matches a Pattern in a PySpark DataFrame: The Ultimate Guide
Diving Straight into Pattern-Based Filtering in a PySpark DataFrame
Filtering rows in a PySpark DataFrame based on patterns is a powerful technique for data engineers and analysts working with Apache Spark in ETL pipelines, data cleaning, or analytics. Whether you're searching for names starting with "A" or emails ending with "@company.com", pattern-based filtering lets you extract rows where a column matches a specific string pattern. This is especially useful for text processing, data validation, and user-driven queries. In this guide, we’re targeting data engineers with intermediate PySpark knowledge, building on your interest in PySpark filtering techniques [Timestamp: March 16, 2025]. If you’re new to PySpark, check out our PySpark Fundamentals.
We’ll explore the basics of pattern matching with like() and regular expressions, advanced filtering with dynamic patterns, handling nested data, using SQL expressions, and optimizing performance. Each section includes practical code examples, outputs, and common pitfalls, explained in a clear, conversational tone to keep things approachable and relevant.
Understanding Pattern-Based Filtering in PySpark
Pattern-based filtering in PySpark involves checking if a column’s string values match a specified pattern, such as a prefix, suffix, or regular expression. PySpark offers methods like like() for SQL-style patterns (using wildcards like % and _) and rlike() for regular expressions, which provide more flexibility for complex patterns. These methods are used with filter() to select matching rows, making them ideal for tasks like finding specific text patterns in large datasets.
Basic Pattern Filtering with like()
Let’s filter employees whose names start with "A" using like().
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("PatternFilterExample").getOrCreate()
# Create employees DataFrame
employees_data = [
(1, "Alice", 30, 50000, 101),
(2, "Bob", 25, 45000, 102),
(3, "Charlie", 35, 60000, 103),
(4, "Adam", 28, 40000, 101)
]
employees = spark.createDataFrame(employees_data, ["employee_id", "name", "age", "salary", "dept_id"])
# Filter names starting with "A"
filtered_df = employees.filter(col("name").like("A%"))
# Show results
filtered_df.show()
# Output:
# +-----------+-----+---+------+-------+
# |employee_id| name|age|salary|dept_id|
# +-----------+-----+---+------+-------+
# | 1|Alice| 30| 50000| 101|
# | 4| Adam| 28| 40000| 101|
# +-----------+-----+---+------+-------+
# Validate row count
assert filtered_df.count() == 2, "Expected 2 rows after pattern filtering"
What’s Happening Here? The like("A%") method checks if the name column starts with "A", where % is a wildcard matching any characters after "A". The filter() method keeps rows where this condition is true. This is a simple way to search for patterns, like finding all names with a specific prefix.
Key Methods:
- col(column).like(pattern): Matches the column value against a SQL-style pattern (% for any characters, _ for a single character).
- filter(condition): Retains rows where the condition evaluates to True.
Common Mistake: Incorrect wildcard usage.
# Incorrect: Wrong wildcard
filtered_df = employees.filter(col("name").like("A*")) # * is not a valid wildcard for like()
# Fix: Use % for like()
filtered_df = employees.filter(col("name").like("A%"))
Error Output: Empty result or AnalysisException due to invalid pattern.
Fix: Use % for zero or more characters in like() patterns.
Advanced Pattern Filtering with Regular Expressions
For more complex patterns, PySpark’s rlike() method supports regular expressions (regex), allowing precise matching, such as emails with specific domains or names with certain formats.
Example: Filtering Emails with a Regex Pattern
Let’s filter employees whose emails end with "@company.com" using rlike().
# Update employees DataFrame with email column
employees_data = [
(1, "Alice", "alice@company.com", 101),
(2, "Bob", "bob@other.com", 102),
(3, "Charlie", "charlie@company.com", 103),
(4, "Adam", "adam@gmail.com", 101)
]
employees = spark.createDataFrame(employees_data, ["employee_id", "name", "email", "dept_id"])
# Filter emails ending with @company.com
filtered_df = employees.filter(col("email").rlike("@company\.com$"))
# Show results
filtered_df.show()
# Output:
# +-----------+-------+--------------------+-------+
# |employee_id| name| email|dept_id|
# +-----------+-------+--------------------+-------+
# | 1| Alice|alice@company.com| 101|
# | 3|Charlie|charlie@company.c...| 103|
# +-----------+-------+--------------------+-------+
# Validate
assert filtered_df.count() == 2, "Expected 2 rows"
What’s Going On? The regex pattern @company.com$ matches emails ending with "@company.com". The . escapes the dot, and $ anchors the match to the end of the string. The rlike() method checks if the email column matches this pattern, and filter() keeps matching rows. This is perfect for complex text patterns, like validating email formats or extracting specific codes.
Common Mistake: Unescaped regex characters.
# Incorrect: Unescaped dot
filtered_df = employees.filter(col("email").rlike("@company.com$")) # Matches unintended patterns
# Fix: Escape special characters
filtered_df = employees.filter(col("email").rlike("@company\.com$"))
Error Output: Incorrect matches (e.g., "@companyXcom") due to unescaped dot.
Fix: Escape special regex characters (e.g., . as .) to ensure precise matching.
Filtering Nested Data with Pattern Matching
Nested data, like structs, is common in semi-structured datasets. You can apply pattern matching to nested fields using dot notation, making it flexible for JSON-like data.
Example: Filtering by Nested Email Pattern
Suppose employees has a contact struct with email and phone. We want to filter rows where the email matches a dynamic pattern.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Create employees with nested contact data
schema = StructType([
StructField("employee_id", IntegerType()),
StructField("name", StringType()),
StructField("contact", StructType([
StructField("email", StringType()),
StructField("phone", StringType())
])),
StructField("dept_id", IntegerType())
])
employees_data = [
(1, "Alice", {"email": "alice@company.com", "phone": "123-456-7890"}, 101),
(2, "Bob", {"email": "bob@other.com", "phone": "234-567-8901"}, 102),
(3, "Charlie", {"email": "charlie@company.com", "phone": "345-678-9012"}, 103)
]
employees = spark.createDataFrame(employees_data, schema)
# Define dynamic pattern
email_pattern = "@company\.com$"
# Filter nested email
filtered_df = employees.filter(col("contact.email").rlike(email_pattern))
# Show results
filtered_df.show()
# Output:
# +-----------+-----+--------------------+-------+
# |employee_id| name| contact|dept_id|
# +-----------+-----+--------------------+-------+
# | 1|Alice|{alice@company.co...| 101|
# | 3|Charlie|{charlie@company....| 103|
# +-----------+-----+--------------------+-------+
# Validate
assert filtered_df.count() == 2
What’s Happening? We use a variable email_pattern to define the regex dynamically. The col("contact.email").rlike(email_pattern) expression applies the pattern to the nested email field, filtering rows where the email ends with "@company.com". This is ideal for dynamic inputs, like user-specified patterns, aligning with your interest in dynamic filtering [Timestamp: March 27, 2025].
Common Mistake: Incorrect nested field reference.
# Incorrect: Non-existent field
filtered_df = employees.filter(col("contact.mail").rlike(email_pattern)) # Raises AnalysisException
# Fix: Verify schema
employees.printSchema()
filtered_df = employees.filter(col("contact.email").rlike(email_pattern))
Error Output: AnalysisException: cannot resolve 'contact.mail'.
Fix: Use printSchema() to confirm nested field names.
Pattern Matching with SQL Expressions
PySpark’s SQL module supports pattern matching with LIKE and REGEXP (or RLIKE), offering a familiar syntax for SQL users. By registering a DataFrame as a view, you can use SQL to filter rows based on patterns.
Example: SQL-Based Pattern Filtering
Let’s filter employees whose names start with "A" using SQL LIKE.
# Restore original employees DataFrame
employees = spark.createDataFrame(employees_data[:4], ["employee_id", "name", "age", "salary", "dept_id"])
# Register DataFrame as a temporary view
employees.createOrReplaceTempView("employees")
# SQL query with LIKE
filtered_df = spark.sql("""
SELECT *
FROM employees
WHERE name LIKE 'A%'
""")
# Show results
filtered_df.show()
# Output:
# +-----------+-----+---+------+-------+
# |employee_id| name|age|salary|dept_id|
# +-----------+-----+---+------+-------+
# | 1|Alice| 30| 50000| 101|
# | 4| Adam| 28| 40000| 101|
# +-----------+-----+---+------+-------+
# Validate
assert filtered_df.count() == 2
What’s Going On? The SQL LIKE 'A%' clause matches names starting with "A", equivalent to col("name").like("A%"). The DataFrame is registered as a view, and spark.sql() executes the query. This is a great option if you prefer SQL or need to integrate with SQL-based workflows.
Common Mistake: Invalid SQL pattern syntax.
# Incorrect: Wrong pattern
spark.sql("SELECT * FROM employees WHERE name LIKE 'A*'") # Invalid wildcard
# Fix: Use % for LIKE
spark.sql("SELECT * FROM employees WHERE name LIKE 'A%'")
Error Output: Empty result due to incorrect wildcard.
Fix: Use % for SQL LIKE patterns, not *.
Optimizing Pattern-Based Filtering Performance
Pattern matching, especially with regex, can be computationally expensive on large datasets. Here are four strategies to optimize performance, leveraging your interest in Spark optimization [Timestamp: March 19, 2025].
- Select Relevant Columns: Include only necessary columns to reduce data shuffling.
- Filter Early: Apply pattern filters early to minimize the dataset size.
- Partition Data: Partition by frequently filtered columns (e.g., dept_id) for faster queries.
- Cache Results: Cache filtered DataFrames for reuse in multi-step pipelines.
Example: Optimized Pattern Filtering
# Define pattern
email_pattern = "@company\.com$"
# Select relevant columns and filter early
optimized_df = employees.select("employee_id", "name", "email") \
.filter(col("email").rlike(email_pattern)) \
.cache()
# Show results
optimized_df.show()
# Output:
# +-----------+-------+--------------------+
# |employee_id| name| email|
# +-----------+-------+--------------------+
# | 1| Alice|alice@company.com|
# | 3|Charlie|charlie@company.c...|
# +-----------+-------+--------------------+
# Validate
assert optimized_df.count() == 2
What’s Happening? We select only employee_id, name, and email, apply the regex filter early, and cache the result. This reduces memory usage and speeds up downstream operations, aligning with your focus on efficient ETL pipelines [Timestamp: March 15, 2025].
Wrapping Up Your Pattern-Based Filtering Mastery
Filtering PySpark DataFrame rows by pattern matching with like() and rlike() is a versatile skill for text processing and data validation. From basic wildcard searches to regex patterns, nested data, SQL expressions, and performance optimizations, you’ve got a robust toolkit for handling pattern-based filtering. Try these techniques in your next Spark project and share your insights on X. For more DataFrame operations, explore DataFrame Transformations.
More Spark Resources to Keep You Going
Published: April 17, 2025