Problem
Clusters start slowing down and may show a combination of the following symptoms:
- Unhealthy cluster events are reported:
- Request timed out. Driver is temporarily unavailable.
- Metastore is down.
- DBFS is down.
- You do not see any high GC events or memory utilization associated with the driver process.
- When you use top on the driver node you see an intermittent high average load.
- The Ganglia related gmetad process shows intermittent high CPU utilization.
- The root disk shows high disk usage with df -h /. Specifically, /var/lib/ganglia/rrds shows high disk usage.
- The Ganglia UI is unable to show the load distribution.
You can verify the issue by looking for files with local in the prefix in /var/lib/ganglia/rrds. Generally, this directory should only have files prefixed with application-<applicationId>.
For example:
%sh ls -ltrhR /var/lib/ganglia/rrds/ | grep -i local rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453624916.driver.Databricks.directoryCommit.markerReadErrors.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.deletedFilesFiltered.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.autoVacuumCount.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453605184.driver.CodeGenerator.generatedMethodSize.min.rrd
Cause
Ganglia metrics typically use less than 10GB of disk space. However, under certain circumstances, a “data explosion” can occur, which causes the root partition to fill with Ganglia metrics. Data explosions also create a dirty cache. When this happens, the Ganglia metrics can consume more than 100GB of disk space on root.
This “data explosion” can happen if you define the spark session variable as global in your Python file and then call functions defined in the same file to perform Apache Spark transformation on data. When this happens, the Spark session logic can be serialized, along with the required function definition, resulting in a Spark session being created on the worker node.
For example, take the following Spark session definition:
%python from pyspark.sql import SparkSession def get_spark(): """Returns a spark session.""" return SparkSession.builder.getOrCreate() if "spark" not in globals(): spark = get_spark() def generator(partition): print(globals()['spark']) for row in partition: yield [word.lower() for word in row["value"]]
If you use the following example commands, local prefixed files are created:
%python from repro import ganglia_test df = spark.createDataFrame([(["Hello"], ), (["Spark"], )], ["value"]) df.rdd.mapPartitions(ganglia_test.generator).toDF(["value"]).show()
The print(globals()['spark']) statement in the generator() function doesn’t result in an error, because it is available as a global variable in the worker nodes. It may fail with an invalid key error in some cases, as that value is not available as a global variable. Streaming jobs that execute on short batch intervals are susceptible to this issue.
Solution
Ensure that you are not using SparkSession.builder.getOrCreate() to define a Spark session as a global variable.
When you troubleshoot, you can use the timestamps on files with the local prefix to help determine when a problematic change was first introduced.