Mastering Apache Spark’s spark.executor.memory Configuration: A Comprehensive Guide
We’ll define spark.executor.memory, detail its configuration in Scala, and provide a practical example—a sales data analysis—to illustrate its impact on performance. We’ll cover all relevant parameters, related settings, and best practices, ensuring a clear understanding of how memory allocation shapes Spark’s execution. By the end, you’ll know how to tune spark.executor.memory for Spark DataFrames and be ready to explore advanced topics like Spark memory management. Let’s dive into the world of Spark executor memory!
What is spark.executor.memory?
The spark.executor.memory configuration property in Apache Spark specifies the amount of memory allocated to each executor process in a Spark application. As outlined in the Apache Spark documentation, this memory is used for computation (e.g., shuffles, joins, aggregations) and storage (e.g., caching, broadcast variables) within executors, which are the worker processes running tasks across the cluster (Sparksession vs. SparkContext). Properly configuring spark.executor.memory is crucial for balancing performance, resource utilization, and stability in distributed Spark jobs.
Key Characteristics
- Executor Heap Memory: Defines the JVM heap size for each executor, shared between execution and storage tasks Spark Executors.
- Performance Driver: Impacts the ability to process large datasets, perform memory-intensive operations, and cache data efficiently Spark How It Works.
- Cluster-Wide: Applies uniformly to all executors, influencing overall application scalability Spark Cluster.
- Configurable: Set via SparkConf, command-line arguments, or configuration files, with defaults suited for small jobs.
- Complementary: Works with related settings like spark.executor.cores and spark.executor.memoryOverhead for holistic resource management Spark Executor Instances.
The spark.executor.memory setting is a cornerstone of Spark’s resource management, directly affecting job efficiency and reliability.
Role of spark.executor.memory in Spark Applications
The spark.executor.memory property plays several critical roles:
- Computation Capacity: Provides memory for executor tasks, such as joins, aggregations, and shuffles, preventing out-of-memory (OOM) errors during complex operations Spark Partitioning Shuffle.
- Data Storage: Allocates space for caching RDDs or DataFrames, improving performance for iterative or repeated computations Spark Caching.
- Resource Utilization: Balances memory across executors to maximize cluster resources, ensuring efficient parallel processing Spark Tasks.
- Scalability: Enables scaling to large datasets by providing sufficient memory for each executor, complementing the number of executors (spark.executor.instances).
- Stability: Prevents job failures due to memory exhaustion, especially in shuffle-heavy or data-intensive workloads Spark How Shuffle Works.
- Monitoring Insight: Influences memory metrics in the Spark UI, helping diagnose performance bottlenecks or memory issues Spark Debug Applications.
Incorrectly setting spark.executor.memory—too low or too high—can lead to OOM errors, garbage collection overhead, or underutilized resources, making it a key tuning parameter.
Configuring spark.executor.memory
The spark.executor.memory property can be set programmatically, via configuration files, or through command-line arguments. Let’s focus on Scala usage and explore each method.
1. Programmatic Configuration
In Scala, spark.executor.memory is set using SparkConf or the SparkSession builder, specifying the memory in a format like "8g" (8 gigabytes) or "8192m" (8192 megabytes).
Example with SparkConf:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
.setAppName("SalesAnalysis")
.setMaster("yarn")
.set("spark.executor.memory", "8g")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
Example with SparkSession Builder:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("SalesAnalysis")
.master("yarn")
.config("spark.executor.memory", "8g")
.getOrCreate()
Method Details:
- set(key, value) (SparkConf):
- Description: Sets a configuration property.
- Parameters:
- key: "spark.executor.memory".
- value: Memory size (e.g., "8g", "8192m", "8GB").
- Returns: SparkConf for chaining.
- config(key, value) (SparkSession.Builder):
- Description: Sets a configuration property directly.
- Parameters:
- key: "spark.executor.memory".
- value: Memory size (e.g., "8g").
- Returns: SparkSession.Builder for chaining.
Behavior:
- Applies the specified memory to all executors launched by the cluster manager.
- Must be a valid size (e.g., "1g", "512m"); invalid formats cause errors.
- Default: 1g (often insufficient for production).
2. File-Based Configuration
The spark.executor.memory can be set in spark-defaults.conf (located in $SPARK_HOME/conf), providing a default value unless overridden.
Example (spark-defaults.conf):
spark.master yarn
spark.executor.memory 4g
spark.app.name DefaultAnalysis
Behavior:
- Loaded automatically when SparkConf or SparkSession is initialized.
- Overridden by programmatic or command-line settings.
- Useful for cluster-wide defaults but less common for job-specific tuning.
3. Command-Line Configuration
The spark.executor.memory can be specified via spark-submit or spark-shell, offering flexibility for dynamic tuning.
Example:
spark-submit --class SalesAnalysis --master yarn \
--conf spark.executor.memory=8g \
SalesAnalysis.jar
Shorthand Option:
spark-submit --class SalesAnalysis --master yarn \
--executor-memory 8g \
SalesAnalysis.jar
Behavior:
- Takes precedence over spark-defaults.conf but is overridden by programmatic settings.
- Ideal for scripts, CI/CD pipelines, or ad-hoc jobs requiring specific memory settings.
Precedence Order: 1. Programmatic (SparkConf.set or SparkSession.config). 2. Command-line (--conf spark.executor.memory or --executor-memory). 3. spark-defaults.conf. 4. Default (1g).
Practical Example: Sales Data Analysis
Let’s illustrate spark.executor.memory with a sales data analysis, processing sales.csv (columns: order_id, customer_id, product, amount, order_date) to compute total sales per customer, joined with a customers table (customers.csv: customer_id, name). We’ll configure spark.executor.memory on a YARN cluster to handle a memory-intensive workload, demonstrating its impact.
Code Example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SalesAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SalesAnalysis_2025_04_12")
.setMaster("yarn")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "4")
.set("spark.executor.instances", "10")
.set("spark.executor.memoryOverhead", "1g")
.set("spark.driver.memory", "4g")
.set("spark.driver.cores", "2")
.set("spark.sql.shuffle.partitions", "100")
.set("spark.task.maxFailures", "4")
.set("spark.memory.fraction", "0.6")
.set("spark.memory.storageFraction", "0.5")
.set("spark.shuffle.service.enabled", "true")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", "hdfs://namenode:9001/logs")
.set("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
// Read data
val salesDF = spark.read.option("header", "true").option("inferSchema", "true")
.csv("hdfs://namenode:9000/sales.csv")
val customersDF = spark.read.option("header", "true").option("inferSchema", "true")
.csv("hdfs://namenode:9000/customers.csv")
// Cache sales data for reuse
salesDF.cache()
// Join and aggregate
val resultDF = salesDF.filter(col("amount") > 100)
.join(customersDF, "customer_id")
.groupBy(salesDF("customer_id"), customersDF("name"))
.agg(sum("amount").alias("total_sales"))
// Save output
resultDF.write.mode("overwrite").save("hdfs://namenode:9000/output")
spark.stop()
}
}
Parameters:
- setAppName(name): Sets the application name for identification Spark Set App Name.
- setMaster(url): Configures YARN as the cluster manager Spark Application Set Master.
- set("spark.executor.memory", value): Allocates 8GB per executor, sufficient for joins and caching.
- set(key, value): Configures executor cores, instances, driver resources, parallelism, fault tolerance, memory management, shuffling, and logging, as detailed in SparkConf.
- read.csv(path): Reads CSV file Spark DataFrame.
- path: HDFS path.
- option(key, value): E.g., "header", "true", "inferSchema", "true".
- cache(): Persists DataFrame in memory Spark Caching.
- filter(condition): Filters rows Spark DataFrame Filter.
- condition: Boolean expression (e.g., col("amount") > 100).
- join(other, on): Joins DataFrames Spark DataFrame Join.
- other: Target DataFrame.
- on: Join key (e.g., "customer_id").
- groupBy(cols): Groups data Spark Group By.
- cols: Column names (e.g., "customer_id", "name").
- agg(expr): Aggregates data Spark DataFrame Aggregations.
- expr: E.g., sum("amount").alias("total_sales").
- write.save(path, mode): Saves output Spark DataFrame Write.
- path: Output path.
- mode: E.g., "overwrite".
Job Submission
Submit the job with spark-submit, reinforcing spark.executor.memory:
spark-submit --class SalesAnalysis --master yarn --deploy-mode cluster \
--executor-memory 8g \
--conf spark.app.name=SalesAnalysis_2025_04_12 \
--conf spark.executor.cores=4 \
--conf spark.executor.instances=10 \
--conf spark.executor.memoryOverhead=1g \
--conf spark.driver.memory=4g \
--conf spark.driver.cores=2 \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.task.maxFailures=4 \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.shuffle.service.enabled=true \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://namenode:9001/logs \
SalesAnalysis.jar
Execution:
- Driver Initialization: The driver creates a SparkSession with spark.executor.memory=8g, connecting to YARN’s ResourceManager Spark Driver Program.
- Resource Allocation: YARN allocates 10 executors, each with 8GB heap memory, 4 cores, and 1GB overhead (spark.executor.memoryOverhead), plus a driver with 4GB memory and 2 cores.
- Data Reading: Reads sales.csv and customers.csv into DataFrames, partitioning based on HDFS block size (e.g., 8 partitions for a 1GB file) Spark Partitioning.
- Caching: salesDF.cache() stores the DataFrame in memory across executors, using spark.memory.fraction=0.6 and spark.memory.storageFraction=0.5 to allocate ~4.8GB (8g × 0.6) for execution/storage, with ~2.4GB for caching, preventing spills to disk Spark Memory Management.
- Processing: Filters rows (amount > 100), joins DataFrames, groups by customer_id and name, and aggregates sums. The join and groupBy trigger shuffles, managed by spark.sql.shuffle.partitions=100 and spark.shuffle.service.enabled=true to optimize data transfer Spark Partitioning Shuffle.
- Memory Usage:
- Execution Memory: Handles shuffle buffers and join operations, supported by 8GB per executor to avoid OOM errors during memory-intensive tasks.
- Storage Memory: Stores cached salesDF partitions, enabling fast access for the join and aggregation.
- Overhead: spark.executor.memoryOverhead=1g covers JVM metadata and off-heap needs, ensuring stability.
- Fault Tolerance: spark.task.maxFailures=4 retries failed tasks, protecting against transient memory issues Spark Task Max Failures.
- Monitoring: The Spark UI (http://driver-host:4040) shows memory usage per executor, with "SalesAnalysis_2025_04_12" identifying the job. YARN’s UI (http://namenode:8088) tracks executor memory allocation, and logs in hdfs://namenode:9001/logs detail memory-related events Spark Debug Applications.
- Output: Writes results to hdfs://namenode:9000/output as 100 partitioned files, reflecting shuffle settings.
Output (hypothetical):
+------------+------+-----------+
|customer_id |name |total_sales|
+------------+------+-----------+
| C1 |Alice | 1200.0|
| C2 |Bob | 600.0|
+------------+------+-----------+
Impact of spark.executor.memory
- Performance: The 8GB per executor supports memory-intensive operations (join, shuffle, cache), preventing spills to disk that would slow execution. For a 1GB dataset, 8GB ensures ample headroom for shuffles (~100MB per partition × 100 partitions).
- Stability: Avoids OOM errors during the groupBy and join, which require significant memory for shuffle buffers and intermediate data.
- Caching Efficiency: Enables caching salesDF (~1GB) across 10 executors, with ~2.4GB storage memory per executor, speeding up the join and aggregation.
- Resource Balance: Pairs with spark.executor.cores=4 and spark.executor.instances=10 to utilize 40 cores and 80GB total heap memory, matching the cluster’s capacity.
- Monitoring: The Spark UI’s “Executors” tab shows ~4.8GB used per executor for execution/storage, with no spills, confirming spark.executor.memory=8g is sufficient.
Best Practices for Setting spark.executor.memory
To optimize spark.executor.memory, follow these best practices:
- Size Based on Workload:
- Allocate 4–16GB for typical jobs, 32–64GB for memory-intensive tasks (e.g., large joins, ML).
- Example: .set("spark.executor.memory", "8g") for a 1GB dataset with joins.
- Consider dataset size, shuffle intensity, and caching needs.
- Balance with Cores:
- Pair with spark.executor.cores (e.g., 4–8 cores for 8–16GB) to ensure ~1–2GB per task.
- Example: .set("spark.executor.cores", "4") with .set("spark.executor.memory", "8g").
- Account for Overhead:
- Set spark.executor.memoryOverhead to 10–20% of spark.executor.memory (e.g., 1GB for 8GB).
- Example: .set("spark.executor.memoryOverhead", "1g").
- Optimize Memory Fractions:
- Use spark.memory.fraction=0.6 for balanced execution/storage, adjusting to 0.75 for shuffle-heavy jobs.
- Set spark.memory.storageFraction=0.5 for caching, reducing to 0.3 if execution dominates.
- Example: .set("spark.memory.fraction", "0.6").
- Scale with Executors:
- Increase spark.executor.instances rather than spark.executor.memory for large clusters to maximize parallelism.
- Example: Prefer 20 executors at 8GB over 10 at 16GB for a 100-core cluster.
- Monitor Usage:
- Check the Spark UI’s “Executors” tab for memory consumption, spills, or OOM errors Spark Debug Applications.
- Adjust spark.executor.memory if executors exceed heap limits or underutilize resources.
- Example: Increase to 12g if spills occur at 8g.
- Test Incrementally:
- Start with 4–8GB in development (local[*]), scaling up for production (YARN, Kubernetes).
- Example: Test with .set("spark.executor.memory", "4g"), deploy with "8g".
- Avoid Over-Allocation:
- Don’t set excessively high memory (e.g., 128GB) unless justified, as it reduces executor count and parallelism.
- Example: Use .set("spark.executor.memory", "16g") for a 100GB dataset, not 64g.
Debugging and Monitoring with spark.executor.memory
The spark.executor.memory setting shapes debugging and monitoring:
- Spark UI: The “Executors” tab at http://driver-host:4040 shows memory usage per executor (e.g., ~4.8GB for execution/storage at 8g), cache size (~2.4GB), and shuffle spills, helping diagnose OOM or inefficiency Spark Debug Applications.
- YARN UI: At http://namenode:8088, displays executor memory allocation (8GB + 1GB overhead × 10), confirming resource usage.
- Logs: Event logs in hdfs://namenode:9001/logs (if spark.eventLog.enabled=true) include memory-related errors (e.g., GC pauses, OOM), filterable by "SalesAnalysis_2025_04_12"Spark Log Configurations.
- Verification: Check active memory setting:
println(s"Executor Memory: ${spark.sparkContext.getConf.get("spark.executor.memory")}")
Example:
- If the Spark UI shows frequent spills in the “Stages” tab during groupBy, increase spark.executor.memory to 12g or reduce spark.sql.shuffle.partitions.
Common Pitfalls and How to Avoid Them
- Insufficient Memory:
- Issue: Low spark.executor.memory (e.g., 1g) causes OOM errors for joins or shuffles.
- Solution: Increase to 8–16GB, monitor spills in Spark UI.
- Example: .set("spark.executor.memory", "8g").
- Over-Allocation:
- Issue: High memory (e.g., 64g) reduces executor count, limiting parallelism.
- Solution: Use moderate sizes (8–16GB), scale with spark.executor.instances.
- Example: .set("spark.executor.memory", "12g") with .set("spark.executor.instances", "20").
- Ignoring Overhead:
- Issue: Missing spark.executor.memoryOverhead leads to JVM failures.
- Solution: Set to 10–20% of spark.executor.memory.
- Example: .set("spark.executor.memoryOverhead", "1g").
- Unbalanced Fractions:
- Issue: Low spark.memory.fraction starves tasks; high spark.memory.storageFraction limits execution.
- Solution: Use 0.6 and 0.5 defaults, adjust for workload.
- Example: .set("spark.memory.fraction", "0.6").
- Ignoring Cluster Limits:
- Issue: Setting spark.executor.memory beyond node capacity causes allocation failures.
- Solution: Check cluster specs (e.g., 32GB/node), allocate conservatively.
- Example: Use 8g for nodes with 32GB total.
Advanced Usage
For advanced scenarios, spark.executor.memory can be dynamically tuned:
- Dynamic Adjustment:
- Use workload parameters to set memory (e.g., dataset size).
- Example:
val dataSizeGB = estimateDataSize() // Custom function val memory = if (dataSizeGB > 100) "16g" else "8g" conf.set("spark.executor.memory", memory)
- Pipeline Optimization:
- Adjust memory per stage in multi-stage pipelines (e.g., higher for joins).
- Example: Separate SparkConf for join-heavy vs. filter-heavy jobs.
- Cloud Environments:
- In Kubernetes or AWS EMR, align spark.executor.memory with instance types (e.g., 8GB for m5.xlarge).
- Example: .set("spark.executor.memory", "8g") for 32GB nodes.
Next Steps
You’ve now mastered spark.executor.memory, understanding its role, configuration, and optimization. To deepen your knowledge:
- Learn Spark Memory Management for deeper memory insights.
- Explore Spark Executors for executor mechanics.
- Dive into Spark Partitioning for parallelism tuning.
- Optimize with Spark Performance Techniques.
With this foundation, you’re ready to tune Spark applications for peak performance. Happy optimizing!