Jobs fails with a TimeoutException error

This error is usually caused by a Broadcast join that takes excessively long to complete.

Written by swetha.nandajan

Last published at: March 3rd, 2023

Problem

You are running Apache Spark SQL queries that perform join operations DataFrames, but the queries keep failing with a TimeoutException error message.

Example stack trace

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
               at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
               at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
               at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
               at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

Cause

This problem usually stems from Spark trying to perform Broadcast join and when the fetching of blocks from different executors consumes an excessive amount of time.

Spark performs Broadcast join using the BitTorrent protocol. The driver splits the data to be broadcasted into small chunks and stores the chunks in the block manager of the driver. The driver also sends the chunks of data to the executors. Each executors keeps copies of the chunks of data in its own block manager. 

When a specific executor is not able to fetch the chunks of data from its local block manager (say the executor died and re-launched) that executor tries to fetch the broadcast data from the driver as well as other executors. This avoids driver being the bottleneck in serving the remote requests. 

Even with this distributed approach, there are some scenarios where the broadcast can take an excessive amount of time, resulting in a TimeoutException error.

  • Busy driver or busy executor: If the Spark driver and executors are extremely busy, it can introduce delay in the broadcast process. If the broadcast process exceeds the threshold limits, it can result in a broadcast timeout.
  • Large broadcast data size: Trying to broadcast a large amount of data can also result in a broadcast timeout. Spark has a default limit of 8GB for broadcast data. 

Solution

You need to identify the query that is causing resource bottleneck on the cluster. Open the Spark UI (AWS | Azure | GCP) and review any failed stages to locate the SQL query causing the failure. Review the Spark SQL plan to see if it uses BroadcastNestedLoopJoin.

  • If the Spark SQL plan uses BroadcastNestedLoopJoin, you need to follow the instructions in the Disable broadcast when query plan has BroadcastNestedLoopJoin article.
  • If the Spark SQL plan does not use BroadcastNestedLoopJoin, you can disable the Broadcast join by setting Spark config values right before the problematic query. You can then revert these changes after the problematic query. Making the change query specific allows other queries, which can benefit from the Broadcast join, to still leverage the benefits.
    • SET spark.sql.autoBroadcastJoinThreshold=-1  
      This disables Broadcast join.
    • SET spark.databricks.adaptive.autoBroadcastJoinThreshold=-1 
      This particular configuration disables adaptive Broadcast join.

Another option is to increase spark.sql.broadcastTimeout to a value above 300 seconds, which is the default value. Increasing spark.sql.broadcastTimeout allows more time for the broadcasting process to finish before it generates a failure. The downside to this approach, is that it may result in longer query times.

For example, setting the value to 600 doubles the amount of time for the Broadcast join to complete.

SET spark.sql.broadcastTimeout=600

This value can be set at the cluster level or the notebook level.