Building a Real-Time Streaming App with Spark and Kafka: A Step-by-Step Guide
Apache Spark’s Structured Streaming, combined with Apache Kafka’s robust messaging system, forms a powerful duo for processing real-time data at scale. Whether you’re analyzing live transactions, monitoring IoT sensors, or tracking social media trends, a Spark-Kafka streaming application can deliver insights as data flows in. This comprehensive guide walks you through building a streaming app that reads data from Kafka, processes it with Spark, and outputs results to various sinks. With practical examples in Scala and PySpark, you’ll learn the end-to-end process—from setup to deployment—along with best practices to ensure reliability and performance.
The Power of Spark and Kafka Integration
Real-time data processing is critical for applications requiring immediate action, such as fraud detection, live dashboards, or recommendation systems. Spark’s Structured Streaming processes continuous data as micro-batches, leveraging its distributed architecture for scalability and fault tolerance. Kafka, a distributed messaging platform, excels at handling high-throughput, durable data streams, making it an ideal source for Spark Streaming.
Together, they offer:
- Scalability: Kafka’s partitioned topics and Spark’s distributed processing handle massive data volumes.
- Reliability: Kafka’s durability and Spark’s checkpointing ensure no data loss.
- Flexibility: Spark’s DataFrame API supports complex transformations, while Kafka integrates with diverse systems.
- Low Latency: Micro-batches enable near-real-time processing.
This guide focuses on Structured Streaming, the modern API for Spark Streaming, which builds on DataFrames and integrates seamlessly with Kafka (PySpark structured streaming overview). For a streaming primer, see Spark streaming getting started.
Understanding Spark-Kafka Streaming
A Spark-Kafka streaming application typically follows this flow:
- Kafka Produces Data: Events (e.g., JSON messages) are published to a Kafka topic.
- Spark Reads Stream: Structured Streaming consumes messages from the topic as a DataFrame.
- Processing: Spark applies transformations like filtering, aggregating, or joining.
- Output: Results are written to sinks (e.g., console, files, databases).
- Checkpointing: State and offsets are saved for fault tolerance (PySpark streaming checkpointing).
Kafka organizes data into topics, partitioned for parallelism, and Spark reads these partitions as a streaming DataFrame. The integration relies on the spark-sql-kafka connector, enabling seamless data flow.
Setting Up the Environment
Before building the app, let’s configure Spark and Kafka.
Prerequisites
- Spark Installation:
- Install Spark 3.5.x or later PySpark installation.
- Verify:
spark-shell
- Kafka Installation:
- Download Kafka (e.g., 3.7.x) from kafka.apache.org.
- Start ZooKeeper and Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
- Create a topic:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
- Kafka Connector:
- Include org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0.
- For PySpark, add via configuration.
- For Scala, include in SBT/Maven:
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0"
- Data Producer:
- Simulate streaming data with Kafka’s console producer:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Sample JSON input:
{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:00:01"}
Building a Spark-Kafka Streaming Application
We’ll create a streaming app that:
- Reads sales transactions from a Kafka topic.
- Filters high-value orders and aggregates totals by region.
- Handles late data with watermarking.
- Writes results to console and Parquet files.
We’ll provide examples in PySpark and Scala, using Structured Streaming.
PySpark Streaming Application
This app processes transactions, applies a watermark, and outputs aggregates to multiple sinks.
Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum as sum_, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Initialize Spark session
spark = SparkSession.builder \
.appName("KafkaStreamingApp") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint") \
.config("spark.sql.shuffle.partitions", "10") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Define schema
schema = StructType([
StructField("order_id", IntegerType()),
StructField("amount", IntegerType()),
StructField("region", StringType()),
StructField("timestamp", TimestampType())
])
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", "1000") \
.load()
# Parse JSON
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
# Apply watermark and filter
watermarked_df = parsed_df.withWatermark("timestamp", "10 minutes")
filtered_df = watermarked_df.filter(col("amount") > 150)
# Aggregate by region
aggregated_df = filtered_df.groupBy("region").agg(sum_("amount").alias("total_amount"))
# Aggregate by 5-minute windows
windowed_df = filtered_df.groupBy(
window(col("timestamp"), "5 minutes"),
col("region")
).agg(sum_("amount").alias("window_amount"))
# Write to console
console_query = aggregated_df.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="10 seconds") \
.start()
# Write to Parquet
parquet_query = windowed_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3://bucket/output") \
.partitionBy("region") \
.option("checkpointLocation", "/tmp/parquet_checkpoint") \
.trigger(processingTime="10 seconds") \
.start()
# Wait for termination
spark.streams.awaitAnyTermination()
Parameters Explained
- Spark Session:
- appName: Identifies the job in the UI.
- spark.jars.packages: Adds Kafka connector.
- spark.sql.streaming.checkpointLocation: Ensures fault tolerance for the main query.
- spark.sql.shuffle.partitions: Limits shuffle partitions for efficiency.
- spark.executor.memory: Allocates 4GB per executor Spark memory management.
- Read Stream:
- format("kafka"): Kafka source.
- kafka.bootstrap.servers: Kafka address.
- subscribe: Topic name (transactions).
- startingOffsets: Reads from the beginning for testing.
- maxOffsetsPerTrigger: Limits messages per batch to control throughput.
- Parsing:
- from_json: Converts JSON to structured columns.
- TimestampType: Enables watermarking.
- Watermarking:
- withWatermark("timestamp", "10 minutes"): Handles late data up to 10 minutes PySpark streaming watermarking.
- Processing:
- filter(col("amount") > 150): Keeps high-value orders.
- groupBy("region"): Running sum per region (stateful).
- window(col("timestamp"), "5 minutes"): 5-minute windowed aggregates PySpark streaming windowing.
- Write Streams:
- Console:
- outputMode("complete"): Full state table for aggregations.
- trigger(processingTime="10 seconds"): Processes every 10 seconds PySpark streaming triggers.
- Parquet:
- outputMode("append"): Outputs finalized windows.
- partitionBy("region"): Partitions output for efficiency PySpark write Parquet.
- Separate checkpointLocation for the Parquet sink.
- Termination:
- spark.streams.awaitAnyTermination(): Waits for any query to stop.
Output
For input:
{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:02:00"}
{"order_id": 3, "amount": 300, "region": "North", "timestamp": "2024-10-01T10:03:00"}
Console output (running totals):
-------------------------------------------
Batch: 1
-------------------------------------------
|region|total_amount|
|South |200 |
|North |300 |
Parquet output (windowed, e.g., in s3://bucket/output/region=North/):
|window |region|window_amount|
|2024-10-01T10:00:00, 10:05:00 |North |300 |
|2024-10-01T10:00:00, 10:05:00 |South |200 |
Scala Streaming Application
The same pipeline in Scala:
Code
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json, sum, window}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.streaming.Trigger
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
// Initialize Spark
val spark = SparkSession.builder()
.appName("KafkaStreamingApp")
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
.config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint")
.config("spark.sql.shuffle.partitions", "10")
.config("spark.executor.memory", "4g")
.getOrCreate()
// Define schema
val schema = StructType(Seq(
StructField("order_id", IntegerType),
StructField("amount", IntegerType),
StructField("region", StringType),
StructField("timestamp", TimestampType)
))
// Read from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "1000")
.load()
// Parse JSON
val parsedDf = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*")
// Watermark and filter
val watermarkedDf = parsedDf.withWatermark("timestamp", "10 minutes")
val filteredDf = watermarkedDf.filter(col("amount") > 150)
// Aggregate by region
val aggregatedDf = filteredDf.groupBy("region")
.agg(sum("amount").alias("total_amount"))
// Aggregate by window
val windowedDf = filteredDf.groupBy(
window(col("timestamp"), "5 minutes"),
col("region")
).agg(sum("amount").alias("window_amount"))
// Write to console
val consoleQuery = aggregatedDf.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// Write to Parquet
val parquetQuery = windowedDf.writeStream
.outputMode("append")
.format("parquet")
.option("path", "s3://bucket/output")
.partitionBy("region")
.option("checkpointLocation", "/tmp/parquet_checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// Wait for termination
spark.streams.awaitAnyTermination()
}
}
Running the Scala Application
- Package with SBT/Maven.
- Submit:
spark-submit --class KafkaStreamingApp \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
target/your-app.jar
The output matches the PySpark example, with running totals on the console and windowed results in Parquet files.
Alternative Approach: Stateless Streaming
For simpler use cases, you might skip stateful aggregations and watermarking, using stateless streaming to filter or transform data without maintaining state across batches.
PySpark Stateless Example
spark = SparkSession.builder \
.appName("StatelessKafkaStreaming") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/stateless_checkpoint") \
.getOrCreate()
schema = StructType([
StructField("order_id", IntegerType()),
StructField("amount", IntegerType()),
StructField("region", StringType()),
StructField("timestamp", StringType())
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "earliest") \
.load()
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
filtered_df = parsed_df.filter(col("amount") > 150)
query = filtered_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()
query.awaitTermination()
This stateless pipeline filters high-value orders without tracking state, reducing complexity but losing cumulative insights (Spark stateful vs stateless streaming).
Debugging and Monitoring
Streaming apps require continuous monitoring:
- Spark UI: Streaming tab shows batch latency, throughput, and state size Spark how to debug Spark applications.
- Kafka Metrics: Monitor topic lag using tools like Kafka Manager.
- Logs: Enable for errors or warnings PySpark logging.
- Checkpointing: Verify state recovery after restarts.
Check the Streaming tab for backlog (input rate vs. processing rate) to ensure the app keeps up.
Best Practices
Build robust Spark-Kafka apps with these tips:
- Use Structured Streaming: For optimization and ease PySpark structured streaming overview.
- Enable Checkpointing: Critical for fault tolerance.
- Set Watermarks: Manage late data in stateful jobs Spark how watermarking works in Spark Streaming.
- Tune Triggers: Balance latency and throughput.
- Optimize Resources: Adjust memory and partitions PySpark partitioning strategies.
- Validate Schemas: Ensure JSON matches schema PySpark printSchema.
Common Pitfalls
Avoid these errors:
- No Checkpointing: Loses state or offsets. Solution: Set checkpointLocation.
- Unbounded State: Without watermarking. Solution: Use withWatermark.
- High Latency: Large batches or small clusters. Solution: Tune triggers and resources.
- Schema Mismatches: Breaks parsing. Solution: Validate input data.
Next Steps
Continue exploring with:
- Stateful streaming Spark stateful vs stateless streaming.
- Performance tuning Spark how to optimize jobs for max performance.
- Cloud integration PySpark with Google Cloud.
Try the Databricks Community Edition for practice.
By building Spark-Kafka streaming apps, you’ll create scalable, reliable pipelines that deliver real-time insights with ease.