Problem
If your application contains any aggregation or join stages, the execution will require a Spark Shuffle stage. Depending on the specific configuration used, if you are running multiple streaming queries on an interactive cluster you may get a shuffle FetchFailedException error.
ShuffleMapStage has failed the maximum allowable number of times DAGScheduler: ShuffleMapStage 499453 (start at command-39573728:13) failed in 468.820 s due to org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 228703 org.apache.spark.shuffle.FetchFailedException: Connection reset by peer at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:747) Caused by: java.io.IOException: Connection reset by peer
Cause
Shuffle fetch failures usually occur during scenarios such as cluster downscaling events, executor loss, or worker decommission. In certain cases, shuffle files from the executor are lost. When a subsequent task tries to fetch the shuffle files, it fails.
The shuffle service is enabled by default in Databricks. This service enables an external shuffle service that preserves the shuffle files written by executors so the executors can be safely removed.
Run spark.conf.get("spark.shuffle.service.enabled") in a Python or Scala notebook cell to return the current value of the shuffle service. If it returns true the service is enabled.
spark.conf.get("spark.shuffle.service.enabled")
Solution
Disable the default Spark Shuffle service.
Disabling the shuffle service does not prevent the shuffle, it just changes the way it is performed. When the service is disabled, the shuffle is performed by the executor.
You can disable the shuffle service by adding spark.shuffle.service.enabled false to the cluster's Spark config (AWS | Azure | GCP).
spark.shuffle.service.enabled false
Restart the cluster after updating the Spark config.