Cluster slowdown due to Ganglia metrics filling root partition
Note
This article applies to Databricks Runtime 7.3 LTS and below.
Problem
Clusters start slowing down and may show a combination of the following symptoms:
- Unhealthy cluster events are reported:
- Request timed out. Driver is temporarily unavailable.
- Metastore is down.
- DBFS is down.
- You do not see any high GC events or memory utilization associated with the driver process.
- When you use
top
on the driver node you see an intermittent high average load. - The Ganglia related
gmetad
process shows intermittent high CPU utilization. - The root disk shows high disk usage with
df -h /
. Specifically,/var/lib/ganglia/rrds
shows high disk usage. - The Ganglia UI is unable to show the load distribution.
You can verify the issue by looking for files with local
in the prefix in /var/lib/ganglia/rrds
. Generally, this directory should only have files prefixed with application-<applicationId>
.
For example:
%sh ls -ltrhR /var/lib/ganglia/rrds/ | grep -i local
rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453624916.driver.Databricks.directoryCommit.markerReadErrors.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.deletedFilesFiltered.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.autoVacuumCount.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453605184.driver.CodeGenerator.generatedMethodSize.min.rrd
Cause
Ganglia metrics typically use less than 10GB of disk space. However, under certain circumstances, a “data explosion” can occur, which causes the root partition to fill with Ganglia metrics. Data explosions also create a dirty cache. When this happens, the Ganglia metrics can consume more than 100GB of disk space on root.
This “data explosion” can happen if you define the spark
session variable as global in your Python file and then call functions defined in the same file to perform Apache Spark transformation on data. When this happens, the Spark session logic can be serialized, along with the required function definition, resulting in a Spark session being created on the worker node.
For example, take the following Spark session definition:
from pyspark.sql import SparkSession
def get_spark():
"""Returns a spark session."""
return SparkSession.builder.getOrCreate()
if "spark" not in globals():
spark = get_spark()
def generator(partition):
print(globals()['spark'])
for row in partition:
yield [word.lower() for word in row["value"]]
If you use the following example commands, local
prefixed files are created:
from repro import ganglia_test
df = spark.createDataFrame([(["Hello"], ), (["Spark"], )], ["value"])
df.rdd.mapPartitions(ganglia_test.generator).toDF(["value"]).show()
The print(globals()['spark'])
statement in the generator()
function doesn’t result in an error, because it is available as a global variable in the worker nodes. It may fail with an invalid key error in some cases, as that value is not available as a global variable. Streaming jobs that execute on short batch intervals are susceptible to this issue.