Problem
Job fails with an ExecutorLostFailure error message.
ExecutorLostFailure (executor <1> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <148564> ms
Cause
The ExecutorLostFailure error message means one of the executors in the Apache Spark cluster has been lost. This is a generic error message which can have more than one root cause. In this article, we will look how to resolve issues when the root cause is due to the executor being busy.
This can happen if the load is too high and the executor is not able to send a heartbeat signal to the driver within a predefined threshold time. If this happens, the driver considers the executor lost.
How do you determine if a high CPU load is the reason for the executor getting lost?
To confirm that the executors are busy, you should check the Ganglia metrics and review the CPU loads.
If the CPU loads are high:
- you have too many small files
- your job may be launching too many API calls/requests
- you don't have an optimum partition strategy
- you don't have a compute intensive cluster
Solution
Here is what you need to do based on different causes of failure.
Are there too many small files?
Compact small files to bigger files. Delta Lake supports OPTIMIZE (AWS | Azure | GCP) which is used to compact files. Auto optimize on Databricks (AWS | Azure | GCP) automatically compacts small files during writes.
If you are not using Delta Lake, you should plan to create larger files before writing data to your tables. This can be achieved by applying repartition() before writing files to the table location.
Are there too many API requests?
Attempt to minimize the number of API calls made by your job. This applies to both external API services and any Databricks REST API calls. One way to do this is by repartitioning the source data so it contains a smaller number of partitions and then making the necessary API calls.
It is also good to determine why you have a high number of API requests. For example, if the root cause is too many small files, you should compact as many small files as possible into large files. A few large files is recommended over many small files.
Are there too many partitions?
One common mistake is over-partitioning of data sources. Instances with multiple thousands of partitions is not optimal. Ideally, you should have a small number of partitions that can be processed in parallel by the available cores your cluster.
Ensure that your partitions have as few levels as possible. For example, instead of using a partition structure like yr=/month=/day=/, you could reduce the date to a single level. For example, date=yyyy-MM-dd.
Avoid partitioning data based on a column that has high cardinality, like id columns. Instead pick a column that is commonly used in your queries, but has a lower cardinality.
Are the CPU loads in Ganglia metrics high?
- If you have high CPU loads.
- If the cluster utilization is more than 80%.
- If you notice certain nodes in the Ganglia metrics are red.
It means you are not using the right type of cluster for the workload.
Make sure your selected cluster has enough CPU cores and available memory. You can also try using a Compute optimized worker type instead of the default Storage optimized worker type for your cluster.