Clusters start slowing down and may show a combination of the following symptoms:
- Unhealthy cluster events are reported:
- Request timed out. Driver is temporarily unavailable.
- Metastore is down.
- DBFS is down.
- You do not see any high GC events or memory utilization associated with the driver process.
- When you use
topon the driver node you see an intermittent high average load.
- The Ganglia related
gmetadprocess shows intermittent high CPU utilization.
- The root disk shows high disk usage with
df -h /. Specifically,
/var/lib/ganglia/rrdsshows high disk usage.
- The Ganglia UI is unable to show the load distribution.
You can verify the issue by looking for files with
local in the prefix in
/var/lib/ganglia/rrds. Generally, this directory should only have files prefixed with
%sh ls -ltrhR /var/lib/ganglia/rrds/ | grep -i local rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453624916.driver.Databricks.directoryCommit.markerReadErrors.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.deletedFilesFiltered.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453614595.driver.Databricks.directoryCommit.autoVacuumCount.count.rrd -rw-rw-rw- 1 ganglia ganglia 616K Jun 29 18:00 local-1593453605184.driver.CodeGenerator.generatedMethodSize.min.rrd
Ganglia metrics typically use less than 10GB of disk space. However, under certain circumstances, a “data explosion” can occur, which causes the root partition to fill with Ganglia metrics. Data explosions also create a dirty cache. When this happens, the Ganglia metrics can consume more than 100GB of disk space on root.
This “data explosion” can happen if you define the
spark session variable as global in your Python file and then call functions defined in the same file to perform Apache Spark transformation on data. When this happens, the Spark session logic can be serialized, along with the required function definition, resulting in a Spark session being created on the worker node.
For example, take the following Spark session definition:
from pyspark.sql import SparkSession def get_spark(): """Returns a spark session.""" return SparkSession.builder.getOrCreate() if "spark" not in globals(): spark = get_spark() def generator(partition): print(globals()['spark']) for row in partition: yield [word.lower() for word in row["value"]]
If you use the following example commands,
local prefixed files are created:
from repro import ganglia_test df = spark.createDataFrame([(["Hello"], ), (["Spark"], )], ["value"]) df.rdd.mapPartitions(ganglia_test.generator).toDF(["value"]).show()
print(globals()['spark']) statement in the
generator() function doesn’t result in an error, because it is available as a global variable in the worker nodes. It may fail with an invalid key error in some cases, as that value is not available as a global variable. Streaming jobs that execute on short batch intervals are susceptible to this issue.