How Watermarking Works in Spark Streaming: Managing Late Data in Real-Time Pipelines
Apache Spark’s Structured Streaming has revolutionized real-time data processing by combining the scalability of batch processing with near-instantaneous analytics. However, handling late or out-of-order data in streaming applications can be challenging, especially when computing aggregates over time. Spark’s watermarking feature addresses this by defining a threshold for how late data can be before it’s discarded, ensuring accurate and efficient stateful computations. In this comprehensive guide, we’ll explore what watermarking is, how it works, its role in Spark Streaming, and how to implement it effectively. With practical examples in Scala and PySpark, you’ll learn to build robust streaming pipelines that gracefully handle late data while maintaining performance and correctness.
The Challenge of Real-Time Streaming
Streaming applications process data as it arrives, enabling use cases like live dashboards, fraud detection, and IoT analytics. Unlike batch processing, where all data is available upfront, streaming deals with continuous, potentially unbounded data flows. This introduces complexities:
- Late Data: Events arriving after their expected processing time due to network delays or system issues.
- Out-of-Order Data: Events processed in a different order than their generation, common in distributed systems.
- State Management: Maintaining aggregates (e.g., running totals) over time without consuming excessive memory.
Spark Streaming, particularly its Structured Streaming API, processes streams as micro-batches—small datasets collected over short intervals (e.g., 1 second). For stateful operations like cumulative sums or windowed aggregates, late data can skew results or inflate state size, slowing down the pipeline. Watermarking provides a mechanism to handle these issues, balancing accuracy, latency, and resource usage. For a streaming primer, see Spark streaming getting started.
What is Watermarking?
Watermarking in Spark Streaming is a technique to manage late data in stateful operations by setting a threshold for how late an event can be before it’s ignored. It’s primarily used in Structured Streaming for aggregations, joins, and windowed computations, ensuring the system doesn’t wait indefinitely for delayed events. A watermark defines a time boundary, allowing Spark to:
- Discard Old Data: Ignore events older than the watermark to prevent state growth.
- Update State Correctly: Include late data within the watermark in ongoing computations.
- Maintain Accuracy: Ensure results reflect the intended time-based logic.
Watermarking is especially critical in stateful streaming, where state (e.g., running totals) is maintained across micro-batches (Spark stateful vs stateless streaming).
How Watermarking Works
Watermarking revolves around the concept of event time—the timestamp associated with when an event occurred, distinct from processing time (when Spark processes it). Here’s a step-by-step look at its mechanics in Structured Streaming:
Step 1: Event Time and Watermark Definition
Each event in a stream includes an event-time timestamp (e.g., when a sale occurred). You define a watermark by specifying:
- Timestamp Column: A column in your DataFrame containing event times.
- Watermark Delay: The maximum delay allowed for late data (e.g., 10 minutes).
For example:
df.withWatermark("timestamp", "10 minutes")
This tells Spark to accept events up to 10 minutes late relative to the latest event time seen.
Step 2: Watermark Calculation
Spark tracks the maximum event time across all processed events in the stream. The watermark is calculated as:
Watermark = Max Event Time - Watermark Delay
For instance, if the latest event has a timestamp of 2024-10-01 10:20:00 and the watermark delay is 10 minutes, the watermark is:
2024-10-01 10:10:00
Events with timestamps before this watermark are considered too late and discarded.
Step 3: State Management
In stateful operations (e.g., groupBy, windowed aggregates), Spark maintains a state store for intermediate results, such as running sums or counts. The watermark determines:
- Inclusion: Late events with timestamps after the watermark update the state.
- Expiration: Events before the watermark are dropped, and their state is removed.
This prevents the state store from growing indefinitely, ensuring memory efficiency.
Step 4: Processing Micro-Batches
For each micro-batch:
- Spark collects incoming data, including any late events.
- It compares event timestamps to the current watermark.
- Events within the watermark update the state and contribute to outputs.
- Events outside the watermark are discarded.
- Results are written to the sink (e.g., console, file) based on the output mode.
Step 5: Checkpointing
Watermarking relies on checkpointing to persist state and metadata across job restarts, ensuring fault tolerance (PySpark streaming checkpointing).
Watermarking is most relevant for stateful operations, as stateless streaming (e.g., simple filters) doesn’t maintain state across batches (Spark stateful vs stateless streaming).
Use Cases for Watermarking
Watermarking shines in scenarios requiring time-based computations:
- Running Aggregates: Calculating cumulative metrics, like total sales per region over time.
- Windowed Aggregations: Computing sums or counts within time windows (e.g., hourly sales) PySpark streaming windowing.
- Stream-Stream Joins: Matching events from two streams, accounting for delays PySpark streaming joins with static data.
- Deduplication: Removing duplicates based on event time, ensuring late duplicates don’t skew results.
Without watermarking, late data could:
- Cause incorrect results by missing updates.
- Inflate state size, leading to memory issues.
- Delay processing as the system waits for stragglers.
Setting Up a Streaming Environment
To demonstrate watermarking, we’ll use Structured Streaming with a Kafka source, processing sales events.
Prerequisites
- Spark Installation:
- Use Spark 3.5.x or later PySpark installation.
- Verify:
pyspark --version
- Kafka Setup:
- Install Kafka (e.g., 3.7.x) from kafka.apache.org.
- Start ZooKeeper and Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
- Create a topic:
bin/kafka-topics.sh --create --topic sales --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- Kafka Dependency:
- Include for Spark:
- PySpark: org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0.
- Scala: Add to SBT/Maven.
- Example configuration:
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
- Data Source:
- Send JSON messages to Kafka:
bin/kafka-console-producer.sh --topic sales --bootstrap-server localhost:9092
Sample input:
{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:00:01"}
{"order_id": 3, "amount": 300, "region": "North", "timestamp": "2024-10-01T09:55:00"}
Implementing Watermarking in Structured Streaming
We’ll build a streaming application that aggregates sales totals by region, using watermarking to handle late data. Examples are provided in PySpark and Scala.
PySpark Watermarking Example
This pipeline reads from Kafka, applies a watermark, and computes a running sum of sales per region.
Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum as sum_
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Initialize Spark session
spark = SparkSession.builder \
.appName("WatermarkStreaming") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/watermark_checkpoint") \
.config("spark.sql.shuffle.partitions", "10") \
.getOrCreate()
# Define schema
schema = StructType([
StructField("order_id", IntegerType()),
StructField("amount", IntegerType()),
StructField("region", StringType()),
StructField("timestamp", TimestampType())
])
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales") \
.option("startingOffsets", "earliest") \
.load()
# Parse JSON
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
# Apply watermark
watermarked_df = parsed_df.withWatermark("timestamp", "10 minutes")
# Aggregate with state
aggregated_df = watermarked_df.groupBy("region").agg(sum_("amount").alias("total_amount"))
# Write to console
query = aggregated_df.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()
query.awaitTermination()
Parameters Explained
- Spark Session:
- appName: Names the job for tracking.
- spark.jars.packages: Includes Kafka connector.
- spark.sql.streaming.checkpointLocation: Persists state for recovery.
- spark.sql.shuffle.partitions: Reduces shuffle partitions for efficiency.
- Read Stream:
- format("kafka"): Kafka source.
- kafka.bootstrap.servers: Kafka address.
- subscribe: Topic name (sales).
- startingOffsets: Reads from the beginning for testing.
- Parsing:
- from_json: Converts JSON to structured columns.
- TimestampType: Ensures timestamp is suitable for watermarking.
- Watermarking:
- withWatermark("timestamp", "10 minutes"): Sets a 10-minute delay threshold.
- Events older than the watermark are discarded.
- Aggregation:
- groupBy("region").agg(sum_("amount")): Computes a running sum, stateful across batches.
- Write Stream:
- outputMode("complete"): Outputs the full state table, suitable for aggregations.
- format("console"): Prints results.
- trigger(processingTime="5 seconds"): Processes every 5 seconds.
- start(): Begins streaming.
For streaming basics, see PySpark structured streaming overview.
Output
For input:
{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:00:01"}
{"order_id": 3, "amount": 300, "region": "North", "timestamp": "2024-10-01T09:55:00"} # Late event
{"order_id": 4, "amount": 400, "region": "North", "timestamp": "2024-10-01T10:10:00"}
Console output after batches (assuming max event time reaches 2024-10-01 10:10:00):
-------------------------------------------
Batch: 1
-------------------------------------------
|region|total_amount|
|North |100 |
|South |200 |
-------------------------------------------
Batch: 2
-------------------------------------------
|region|total_amount|
|North |500 | # Includes late event (09:55:00) within watermark
|South |200 |
-------------------------------------------
Batch: 3
-------------------------------------------
|region|total_amount|
|North |900 | # Adds event at 10:10:00
|South |200 |
The late event (09:55:00) is included because it’s within the 10-minute watermark (10:10:00 - 10 minutes = 10:00:00). Older events would be discarded.
Scala Watermarking Example
The same pipeline in Scala:
Code
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json, sum}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.streaming.Trigger
object WatermarkStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("WatermarkStreaming")
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
.config("spark.sql.streaming.checkpointLocation", "/tmp/watermark_checkpoint")
.config("spark.sql.shuffle.partitions", "10")
.getOrCreate()
val schema = StructType(Seq(
StructField("order_id", IntegerType),
StructField("amount", IntegerType),
StructField("region", StringType),
StructField("timestamp", TimestampType)
))
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sales")
.option("startingOffsets", "earliest")
.load()
val parsedDf = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*")
val watermarkedDf = parsedDf.withWatermark("timestamp", "10 minutes")
val aggregatedDf = watermarkedDf.groupBy("region")
.agg(sum("amount").alias("total_amount"))
val query = aggregatedDf.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}
Running the Scala Application
- Package with SBT/Maven.
- Submit:
spark-submit --class WatermarkStreaming \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
target/your-app.jar
The output mirrors the PySpark example, accumulating totals with late data handled correctly.
Watermarking with Windowed Aggregations
Watermarking is often used with time windows to compute metrics over fixed intervals (e.g., hourly sales). Let’s extend the pipeline to aggregate sales by region within 5-minute windows.
PySpark Windowed Example
Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum as sum_, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
spark = SparkSession.builder \
.appName("WindowedWatermark") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/windowed_checkpoint") \
.config("spark.sql.shuffle.partitions", "10") \
.getOrCreate()
schema = StructType([
StructField("order_id", IntegerType()),
StructField("amount", IntegerType()),
StructField("region", StringType()),
StructField("timestamp", TimestampType())
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales") \
.option("startingOffsets", "earliest") \
.load()
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
watermarked_df = parsed_df.withWatermark("timestamp", "10 minutes")
windowed_df = watermarked_df.groupBy(
window(col("timestamp"), "5 minutes"),
col("region")
).agg(sum_("amount").alias("total_amount"))
query = windowed_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()
query.awaitTermination()
Parameters Explained
- Windowing:
- window(col("timestamp"), "5 minutes"): Groups events into 5-minute windows based on event time.
- Output Mode:
- append: Outputs only finalized windows (after watermark ensures no more late data).
- Watermark:
- Ensures late events within 10 minutes update the correct windows; older events are discarded.
Output
For input:
{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:02:00"}
{"order_id": 3, "amount": 300, "region": "North", "timestamp": "2024-10-01T09:58:00"} # Late
Output after batches:
-------------------------------------------
Batch: 1
-------------------------------------------
|window |region|total_amount|
|2024-10-01T10:00:00, 10:05:00 |North |400 | # Includes late event
|2024-10-01T10:00:00, 10:05:00 |South |200 |
The late event (09:58:00) updates the 10:00–10:05 window, as it’s within the watermark.
Best Practices
Optimize watermarking with these tips:
- Choose Appropriate Delay: Set watermark delay based on expected latency (e.g., 10 minutes for moderate delays).
- Use Checkpointing: Persist state for recovery PySpark streaming checkpointing.
- Monitor State Size: Watch Spark UI for memory usage Spark how to debug Spark applications.
- Validate Timestamps: Ensure event-time columns are accurate PySpark printSchema.
- Tune Triggers: Balance latency with trigger settings PySpark streaming triggers.
- Combine Optimizations: Use with pruning and caching PySpark cache.
Common Pitfalls
Avoid these mistakes:
- No Watermark: Unbounded state growth. Solution: Always set watermark for stateful jobs.
- Incorrect Delay: Too short discards valid data; too long bloats state. Solution: Test delay with data patterns.
- Missing Checkpointing: Loses state on failure. Solution: Configure checkpointLocation.
- Invalid Timestamps: Breaks watermark logic. Solution: Validate schema.
Monitoring and Validation
Ensure watermarking works:
- Spark UI: Streaming tab shows batch latency and state size.
- Execution Plans: Verify watermark in explain().
- Logs: Check for state cleanup or errors PySpark logging.
- Output: Confirm late data inclusion and state accuracy.
Next Steps
Continue exploring Spark Streaming with:
- Stateful streaming Spark stateful vs stateless streaming.
- Kafka integration Spark how to build streaming app using Kafka.
- Optimization Spark how to optimize jobs for max performance.
Try the Databricks Community Edition for practice.
By mastering watermarking, you’ll build streaming pipelines that handle late data gracefully, delivering accurate, scalable real-time analytics.