Problem
You have an application with joins or aggregations, so need your data to reorganize according to the key. This requires a data shuffle stage. Your job fails at this data shuffle stage with the following exception.
org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 149 (localCheckpoint at MergeIntoMaterializeSource.scala:364) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1168)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.$anonfun$next$1(ShuffleBlockFetcherIterator.scala:904)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
Caused by: java.nio.channels.ClosedChannelException
Cause
This issue occurs when a task run is not able to fetch shuffle data that is supposed to serve as input to the stage. The inability to fetch shuffle data can occur when:
- A cluster downsizes during an auto-scaling event before the shuffle data is read.
- A spot instance is terminated.
- An executor becomes unresponsive or runs out of memory.
Cluster downsizing and terminated spot instances cause ongoing shuffle operations in an Apache Spark job to fail, due to missing data from the terminated executor.
When an executor hosting the shuffle data or shuffle server gets overwhelmed and stops responding to shuffle data requests, the nonresponsiveness causes the failure.
Solution
First, diagnose which cause is behind your job’s failure. The process is detailed in the Failing jobs or executors removed (AWS | Azure | GCP) documentation.
From there, long-term solutions can focus on optimizing code logic, data layout, and minimizing data explosions depending on your context. The following two options specifically consider large joins or tasks handling excessive data.
- If large joins cause fetch failures, broadcast smaller tables to avoid shuffles.
spark.sql.autoBroadcastJoinThreshold <large_value_in_bytes>
spark.databricks.adaptive.autoBroadcastJoinThreshold <similar_large_value>
- If tasks handle excessive data, leading to memory issues or disk spills, increase shuffle partitions or use auto. Note that changing
spark.sql.shuffle.partitions
only applies when a new query is started. Resuming will always pick up the previous value.
spark.sql.shuffle.partitions appropriate_value or auto
For temporary, short term solutions to help your job succeed, consider the following configurations.
- Use the Spark UI to check if a failed shuffle stage made significant progress after X number of attempts. If so, increasing retry attempts may help job progress past the stage.
spark.shuffle.io.maxRetries <higher_value> (default: 3)
- Configure task-level parallelism carefully to avoid underutilized CPU resources. Note that
spark.task.cpus
increases the number of cores per task and reduces the overall load on the worker. This reduces parallelism, so increase the number of workers accordingly.
spark.task.cpus <value> where value > 1 but < total_cores
Important
Increasing the spark.task.cpus
value without increasing worker count increases job duration.
- Prevent worker side network related timeouts due during large garbage collection pauses by increasing timeout values.
spark.network.timeout = 800s
spark.rpc.timeout = 600s
- As a final temporary measure, you can increase cluster capacity. Note that this may incur extra costs.