Jobs failing at data shuffle stage with error org.apache.spark.shuffle.FetchFailedException

Analyze the shuffle data distribution across executors and join query strategies.

Written by swetha.nandajan

Last published at: January 31st, 2025

Problem

You have an application with joins or aggregations, so need your data to reorganize according to the key. This requires a data shuffle stage. Your job fails at this data shuffle stage with the following exception.

 

org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 149 (localCheckpoint at MergeIntoMaterializeSource.scala:364) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException

at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1168)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.$anonfun$next$1(ShuffleBlockFetcherIterator.scala:904)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at 
Caused by: java.nio.channels.ClosedChannelException

 

Cause

This issue occurs when a task run is not able to fetch shuffle data that is supposed to serve as input to the stage. The inability to fetch shuffle data can occur when:  

  • A cluster downsizes during an auto-scaling event before the shuffle data is read.
  • A spot instance is terminated. 
  • An executor becomes unresponsive or runs out of memory. 

 

Cluster downsizing and terminated spot instances cause ongoing shuffle operations in an Apache Spark job to fail, due to missing data from the terminated executor. 

 

When an executor hosting the shuffle data or shuffle server gets overwhelmed and stops responding to shuffle data requests, the nonresponsiveness causes the failure.

 

Solution

First, diagnose which cause is behind your job’s failure. The process is detailed in the Failing jobs or executors removed (AWSAzureGCP) documentation. 

 

From there, long-term solutions can focus on optimizing code logic, data layout, and minimizing data explosions depending on your context. The following two options specifically consider large joins or tasks handling excessive data. 

  • If large joins cause fetch failures, broadcast smaller tables to avoid shuffles.

 

spark.sql.autoBroadcastJoinThreshold <large_value_in_bytes>  
spark.databricks.adaptive.autoBroadcastJoinThreshold <similar_large_value> 

 

  • If tasks handle excessive data, leading to memory issues or disk spills, increase shuffle partitions or use auto. Note that changing spark.sql.shuffle.partitions only applies when a new query is started. Resuming will always pick up the previous value.

 

spark.sql.shuffle.partitions appropriate_value or auto 

 

For temporary, short term solutions to help your job succeed, consider the following configurations.

  • Use the Spark UI to check if a failed shuffle stage made significant progress after X number of attempts. If so, increasing retry attempts may help job progress past the stage.

 

spark.shuffle.io.maxRetries <higher_value> (default: 3)   

 

  • Configure task-level parallelism carefully to avoid underutilized CPU resources. Note that spark.task.cpus increases the number of cores per task and reduces the overall load on the worker. This reduces parallelism, so increase the number of workers accordingly. 

 

spark.task.cpus <value> where value  > 1 but < total_cores 

 

Important

Increasing the spark.task.cpus value without increasing worker count increases job duration.

 

 

  • Prevent worker side network related timeouts due during large garbage collection pauses by increasing timeout values. 

 

spark.network.timeout = 800s  
spark.rpc.timeout = 600s  

 

  • As a final temporary measure, you can increase cluster capacity. Note that this may incur extra costs.