Building a Real-Time Streaming App with Spark and Kafka: A Step-by-Step Guide

Apache Spark’s Structured Streaming, combined with Apache Kafka’s robust messaging system, forms a powerful duo for processing real-time data at scale. Whether you’re analyzing live transactions, monitoring IoT sensors, or tracking social media trends, a Spark-Kafka streaming application can deliver insights as data flows in. This comprehensive guide walks you through building a streaming app that reads data from Kafka, processes it with Spark, and outputs results to various sinks. With practical examples in Scala and PySpark, you’ll learn the end-to-end process—from setup to deployment—along with best practices to ensure reliability and performance.

The Power of Spark and Kafka Integration

Real-time data processing is critical for applications requiring immediate action, such as fraud detection, live dashboards, or recommendation systems. Spark’s Structured Streaming processes continuous data as micro-batches, leveraging its distributed architecture for scalability and fault tolerance. Kafka, a distributed messaging platform, excels at handling high-throughput, durable data streams, making it an ideal source for Spark Streaming.

Together, they offer:

  • Scalability: Kafka’s partitioned topics and Spark’s distributed processing handle massive data volumes.
  • Reliability: Kafka’s durability and Spark’s checkpointing ensure no data loss.
  • Flexibility: Spark’s DataFrame API supports complex transformations, while Kafka integrates with diverse systems.
  • Low Latency: Micro-batches enable near-real-time processing.

This guide focuses on Structured Streaming, the modern API for Spark Streaming, which builds on DataFrames and integrates seamlessly with Kafka (PySpark structured streaming overview). For a streaming primer, see Spark streaming getting started.

Understanding Spark-Kafka Streaming

A Spark-Kafka streaming application typically follows this flow:

  1. Kafka Produces Data: Events (e.g., JSON messages) are published to a Kafka topic.
  2. Spark Reads Stream: Structured Streaming consumes messages from the topic as a DataFrame.
  3. Processing: Spark applies transformations like filtering, aggregating, or joining.
  4. Output: Results are written to sinks (e.g., console, files, databases).
  5. Checkpointing: State and offsets are saved for fault tolerance (PySpark streaming checkpointing).

Kafka organizes data into topics, partitioned for parallelism, and Spark reads these partitions as a streaming DataFrame. The integration relies on the spark-sql-kafka connector, enabling seamless data flow.

Setting Up the Environment

Before building the app, let’s configure Spark and Kafka.

Prerequisites

  1. Spark Installation:
  1. Kafka Installation:
    • Download Kafka (e.g., 3.7.x) from kafka.apache.org.
    • Start ZooKeeper and Kafka:
    • bin/zookeeper-server-start.sh config/zookeeper.properties
           bin/kafka-server-start.sh config/server.properties
    • Create a topic:
    • bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
  1. Kafka Connector:
    • Include org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0.
    • For PySpark, add via configuration.
    • For Scala, include in SBT/Maven:
    • libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0"
  1. Data Producer:
    • Simulate streaming data with Kafka’s console producer:
    • bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

Sample JSON input:

{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
     {"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:00:01"}

Building a Spark-Kafka Streaming Application

We’ll create a streaming app that:

  • Reads sales transactions from a Kafka topic.
  • Filters high-value orders and aggregates totals by region.
  • Handles late data with watermarking.
  • Writes results to console and Parquet files.

We’ll provide examples in PySpark and Scala, using Structured Streaming.

PySpark Streaming Application

This app processes transactions, applies a watermark, and outputs aggregates to multiple sinks.

Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum as sum_, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaStreamingApp") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("amount", IntegerType()),
    StructField("region", StringType()),
    StructField("timestamp", TimestampType())
])

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "1000") \
    .load()

# Parse JSON
parsed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

# Apply watermark and filter
watermarked_df = parsed_df.withWatermark("timestamp", "10 minutes")
filtered_df = watermarked_df.filter(col("amount") > 150)

# Aggregate by region
aggregated_df = filtered_df.groupBy("region").agg(sum_("amount").alias("total_amount"))

# Aggregate by 5-minute windows
windowed_df = filtered_df.groupBy(
    window(col("timestamp"), "5 minutes"),
    col("region")
).agg(sum_("amount").alias("window_amount"))

# Write to console
console_query = aggregated_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="10 seconds") \
    .start()

# Write to Parquet
parquet_query = windowed_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3://bucket/output") \
    .partitionBy("region") \
    .option("checkpointLocation", "/tmp/parquet_checkpoint") \
    .trigger(processingTime="10 seconds") \
    .start()

# Wait for termination
spark.streams.awaitAnyTermination()

Parameters Explained

  1. Spark Session:
    • appName: Identifies the job in the UI.
    • spark.jars.packages: Adds Kafka connector.
    • spark.sql.streaming.checkpointLocation: Ensures fault tolerance for the main query.
    • spark.sql.shuffle.partitions: Limits shuffle partitions for efficiency.
    • spark.executor.memory: Allocates 4GB per executor Spark memory management.
  1. Read Stream:
    • format("kafka"): Kafka source.
    • kafka.bootstrap.servers: Kafka address.
    • subscribe: Topic name (transactions).
    • startingOffsets: Reads from the beginning for testing.
    • maxOffsetsPerTrigger: Limits messages per batch to control throughput.
  1. Parsing:
    • from_json: Converts JSON to structured columns.
    • TimestampType: Enables watermarking.
  1. Watermarking:
  1. Processing:
    • filter(col("amount") > 150): Keeps high-value orders.
    • groupBy("region"): Running sum per region (stateful).
    • window(col("timestamp"), "5 minutes"): 5-minute windowed aggregates PySpark streaming windowing.
  1. Write Streams:
    • Console:
      • outputMode("complete"): Full state table for aggregations.
      • trigger(processingTime="10 seconds"): Processes every 10 seconds PySpark streaming triggers.
    • Parquet:
      • outputMode("append"): Outputs finalized windows.
      • partitionBy("region"): Partitions output for efficiency PySpark write Parquet.
      • Separate checkpointLocation for the Parquet sink.
  1. Termination:
    • spark.streams.awaitAnyTermination(): Waits for any query to stop.

Output

For input:

{"order_id": 1, "amount": 100, "region": "North", "timestamp": "2024-10-01T10:00:00"}
{"order_id": 2, "amount": 200, "region": "South", "timestamp": "2024-10-01T10:02:00"}
{"order_id": 3, "amount": 300, "region": "North", "timestamp": "2024-10-01T10:03:00"}

Console output (running totals):

-------------------------------------------
Batch: 1
-------------------------------------------
|region|total_amount|
|South |200         |
|North |300         |

Parquet output (windowed, e.g., in s3://bucket/output/region=North/):

|window                           |region|window_amount|
|2024-10-01T10:00:00, 10:05:00    |North |300         |
|2024-10-01T10:00:00, 10:05:00    |South |200         |

Scala Streaming Application

The same pipeline in Scala:

Code

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json, sum, window}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.streaming.Trigger

object KafkaStreamingApp {
  def main(args: Array[String]): Unit = {
    // Initialize Spark
    val spark = SparkSession.builder()
      .appName("KafkaStreamingApp")
      .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
      .config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint")
      .config("spark.sql.shuffle.partitions", "10")
      .config("spark.executor.memory", "4g")
      .getOrCreate()

    // Define schema
    val schema = StructType(Seq(
      StructField("order_id", IntegerType),
      StructField("amount", IntegerType),
      StructField("region", StringType),
      StructField("timestamp", TimestampType)
    ))

    // Read from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "transactions")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", "1000")
      .load()

    // Parse JSON
    val parsedDf = df.selectExpr("CAST(value AS STRING)")
      .select(from_json(col("value"), schema).alias("data"))
      .select("data.*")

    // Watermark and filter
    val watermarkedDf = parsedDf.withWatermark("timestamp", "10 minutes")
    val filteredDf = watermarkedDf.filter(col("amount") > 150)

    // Aggregate by region
    val aggregatedDf = filteredDf.groupBy("region")
      .agg(sum("amount").alias("total_amount"))

    // Aggregate by window
    val windowedDf = filteredDf.groupBy(
      window(col("timestamp"), "5 minutes"),
      col("region")
    ).agg(sum("amount").alias("window_amount"))

    // Write to console
    val consoleQuery = aggregatedDf.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    // Write to Parquet
    val parquetQuery = windowedDf.writeStream
      .outputMode("append")
      .format("parquet")
      .option("path", "s3://bucket/output")
      .partitionBy("region")
      .option("checkpointLocation", "/tmp/parquet_checkpoint")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    // Wait for termination
    spark.streams.awaitAnyTermination()
  }
}

Running the Scala Application

  1. Package with SBT/Maven.
  2. Submit:
spark-submit --class KafkaStreamingApp \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
       target/your-app.jar

The output matches the PySpark example, with running totals on the console and windowed results in Parquet files.

Alternative Approach: Stateless Streaming

For simpler use cases, you might skip stateful aggregations and watermarking, using stateless streaming to filter or transform data without maintaining state across batches.

PySpark Stateless Example

spark = SparkSession.builder \
    .appName("StatelessKafkaStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/stateless_checkpoint") \
    .getOrCreate()

schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("amount", IntegerType()),
    StructField("region", StringType()),
    StructField("timestamp", StringType())
])

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

parsed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

filtered_df = parsed_df.filter(col("amount") > 150)

query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start()

query.awaitTermination()

This stateless pipeline filters high-value orders without tracking state, reducing complexity but losing cumulative insights (Spark stateful vs stateless streaming).

Debugging and Monitoring

Streaming apps require continuous monitoring:

  • Spark UI: Streaming tab shows batch latency, throughput, and state size Spark how to debug Spark applications.
  • Kafka Metrics: Monitor topic lag using tools like Kafka Manager.
  • Logs: Enable for errors or warnings PySpark logging.
  • Checkpointing: Verify state recovery after restarts.

Check the Streaming tab for backlog (input rate vs. processing rate) to ensure the app keeps up.

Best Practices

Build robust Spark-Kafka apps with these tips:

Common Pitfalls

Avoid these errors:

  • No Checkpointing: Loses state or offsets. Solution: Set checkpointLocation.
  • Unbounded State: Without watermarking. Solution: Use withWatermark.
  • High Latency: Large batches or small clusters. Solution: Tune triggers and resources.
  • Schema Mismatches: Breaks parsing. Solution: Validate input data.

Next Steps

Continue exploring with:

Try the Databricks Community Edition for practice.

By building Spark-Kafka streaming apps, you’ll create scalable, reliable pipelines that deliver real-time insights with ease.