How to Filter Rows Where a Column Value Is Between Two Values in a PySpark DataFrame: The Ultimate Guide
Diving Straight into Filtering Rows Between Two Values in a PySpark DataFrame
Filtering rows in a PySpark DataFrame is a critical operation for data engineers and analysts working with Apache Spark in ETL pipelines, data cleaning, or analytics. One common task is filtering rows where a column value falls within a specific range, such as salaries between $40,000 and $60,000 or ages between 25 and 35. This operation is essential for isolating relevant data subsets for analysis, reporting, or machine learning. In this guide, we target data engineers with intermediate PySpark knowledge, providing a comprehensive exploration of filtering rows based on a range of values. If you're new to PySpark, start with our PySpark Fundamentals.
We'll cover the basics of range-based filtering, advanced filtering with multiple conditions, handling nested data, SQL-based approaches, and performance optimization. Each section includes practical code examples, outputs, and common pitfalls to ensure you master this essential PySpark technique.
Understanding Range-Based Filtering in PySpark: The Basics
PySpark provides several methods to filter rows where a column value lies between two values, primarily using the filter() method with conditions or the between() function. These methods are intuitive and integrate seamlessly into DataFrame operations.
Basic Range Filtering Example
Let's filter employees with salaries between $45,000 and $55,000 using between().
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("RangeFilterExample").getOrCreate()
# Create employees DataFrame
employees_data = [
(1, "Alice", 30, 50000, 101),
(2, "Bob", 25, 45000, 102),
(3, "Charlie", 35, 60000, 103),
(4, "David", 28, 40000, 101)
]
employees = spark.createDataFrame(employees_data, ["employee_id", "name", "age", "salary", "dept_id"])
# Filter salaries between 45000 and 55000
filtered_df = employees.filter(col("salary").between(45000, 55000))
# Show results
filtered_df.show()
# Output:
# +-----------+-----+---+------+-------+
# |employee_id| name|age|salary|dept_id|
# +-----------+-----+---+------+-------+
# | 1|Alice| 30| 50000| 101|
# | 2| Bob| 25| 45000| 102|
# +-----------+-----+---+------+-------+
# Validate row count
assert filtered_df.count() == 2, "Expected 2 rows after range filtering"
Explanation: The between() function checks if salary is inclusively between 45000 and 55000. It returns a boolean column used by filter() to select matching rows.
Primary Method Parameters:
- col(column).between(lowerBound, upperBound): Returns True if the column value is between lowerBound and upperBound (inclusive).
- filter(condition): Retains rows where the condition evaluates to True.
Common Error: Incorrect boundary values.
# Incorrect: Swapped boundaries
filtered_df = employees.filter(col("salary").between(55000, 45000)) # Returns empty result
# Fix: Ensure lowerBound < upperBound
filtered_df = employees.filter(col("salary").between(45000, 55000))
Error Output: Empty DataFrame due to invalid range.
Fix: Verify that lowerBound is less than upperBound.
Advanced Range Filtering with Multiple Conditions
Range filtering often combines with other conditions, such as filtering by department or age alongside salary.
Example: Filtering by Salary and Department
Let's filter employees with salaries between $40,000 and $60,000 in the HR department (dept_id=101).
# Filter by salary and department
filtered_df = employees.filter(
(col("salary").between(40000, 60000)) & (col("dept_id") == 101)
)
# Show results
filtered_df.show()
# Output:
# +-----------+-----+---+------+-------+
# |employee_id| name|age|salary|dept_id|
# +-----------+-----+---+------+-------+
# | 1|Alice| 30| 50000| 101|
# | 4|David| 28| 40000| 101|
# +-----------+-----+---+------+-------+
# Validate
assert filtered_df.count() == 2, "Expected 2 rows"
Explanation: We combine between() with a department condition using & (AND). Parentheses ensure proper operator precedence.
Common Error: Missing parentheses in complex conditions.
# Incorrect: Ambiguous operator precedence
filtered_df = employees.filter(col("salary").between(40000, 60000) & col("dept_id") == 101)
# Fix: Use parentheses
filtered_df = employees.filter((col("salary").between(40000, 60000)) & (col("dept_id") == 101))
Error Output: AnalysisException due to operator precedence.
Fix: Enclose each condition in parentheses to clarify logic.
Filtering Nested Data with Range Conditions
Range filtering can be applied to nested data, such as structs, using dot notation to access fields.
Example: Filtering by Nested Salary Data
Suppose employees includes a details struct with current_salary and bonus.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Create employees with nested data
schema = StructType([
StructField("employee_id", IntegerType()),
StructField("name", StringType()),
StructField("details", StructType([
StructField("current_salary", IntegerType()),
StructField("bonus", IntegerType())
])),
StructField("dept_id", IntegerType())
])
employees_data = [
(1, "Alice", {"current_salary": 50000, "bonus": 5000}, 101),
(2, "Bob", {"current_salary": 45000, "bonus": 3000}, 102),
(3, "Charlie", {"current_salary": 60000, "bonus": 7000}, 103)
]
employees = spark.createDataFrame(employees_data, schema)
# Filter by current_salary between 45000 and 55000
filtered_df = employees.filter(col("details.current_salary").between(45000, 55000))
# Show results
filtered_df.show()
# Output:
# +-----------+-----+--------------------+-------+
# |employee_id| name| details|dept_id|
# +-----------+-----+--------------------+-------+
# | 1|Alice|{50000, 5000} | 101|
# | 2| Bob|{45000, 3000} | 102|
# +-----------+-----+--------------------+-------+
# Validate
assert filtered_df.count() == 2
Explanation: The details.current_salary field is accessed using dot notation, and between() is applied as usual.
Common Error: Accessing non-existent nested fields.
# Incorrect: Non-existent field
filtered_df = employees.filter(col("details.salary").between(45000, 55000)) # Raises AnalysisException
# Fix: Verify schema
employees.printSchema()
filtered_df = employees.filter(col("details.current_salary").between(45000, 55000))
Error Output: AnalysisException: cannot resolve 'details.salary'.
Fix: Use printSchema() to confirm nested field names.
Range Filtering with SQL Expressions
PySpark's SQL module supports range filtering using SQL syntax, which is intuitive for SQL users.
Example: SQL-Based Range Filtering
# Register DataFrame as a temporary view
employees.createOrReplaceTempView("employees")
# SQL query for salary range
filtered_df = spark.sql("""
SELECT *
FROM employees
WHERE salary BETWEEN 45000 AND 55000
""")
# Show results
filtered_df.show()
# Output:
# +-----------+-----+---+------+-------+
# |employee_id| name|age|salary|dept_id|
# +-----------+-----+---+------+-------+
# | 1|Alice| 30| 50000| 101|
# | 2| Bob| 25| 45000| 102|
# +-----------+-----+---+------+-------+
# Validate
assert filtered_df.count() == 2
Explanation: The SQL BETWEEN operator is equivalent to col().between(). The DataFrame is registered as a view for SQL queries.
Common Error: Incorrect column name in SQL.
# Incorrect: Wrong column name
filtered_df = spark.sql("SELECT * FROM employees WHERE sal BETWEEN 45000 AND 55000") # Raises AnalysisException
# Fix: Use correct column name
filtered_df = spark.sql("SELECT * FROM employees WHERE salary BETWEEN 45000 AND 55000")
Error Output: AnalysisException: cannot resolve 'sal'.
Fix: Verify column names with printSchema() or describe().
Optimizing Range Filtering Performance
Range filtering on large datasets can be resource-intensive. Here are four strategies to optimize performance:
- Select Relevant Columns: Reduce data shuffling by selecting only necessary columns before filtering.
- Filter Early: Apply range filters as early as possible to reduce DataFrame size.
- Partition Data: Partition data by the filtered column to improve query performance.
- Cache Results: Cache filtered DataFrames for reuse in downstream operations.
Example: Optimized Range Filtering
# Select relevant columns and filter early
optimized_df = employees.select("employee_id", "name", "salary") \
.filter(col("salary").between(45000, 55000))
# Cache result
optimized_df.cache()
# Show results
optimized_df.show()
# Output:
# +-----------+-----+------+
# |employee_id| name|salary|
# +-----------+-----+------+
# | 1|Alice| 50000|
# | 2| Bob| 45000|
# +-----------+-----+------+
# Validate
assert optimized_df.count() == 2
Explanation: We select only employee_id, name, and salary, apply the filter early, and cache the result to optimize downstream operations.
Wrapping Up Your Range Filtering Mastery
Filtering PySpark DataFrames by a column value between two bounds is a powerful technique for data processing. From basic between() usage to advanced multi-condition filtering, nested data, and SQL queries, you've learned practical methods to streamline your ETL pipelines. Apply these techniques in your Spark projects and share your feedback on X. For more DataFrame operations, explore DataFrame Transformations.
More Spark Resources to Keep You Going
Published: April 17, 2025