How to explore Apache Spark metrics with Spark listeners

Learn how to explore Apache Spark metrics using Spark listeners with Databricks.

Written by Adam Pavlacka

Last published at: May 16th, 2022

Apache Spark provides several useful internal listeners that track metrics about tasks and jobs. During the development cycle, for example, these metrics can help you to understand when and why a task takes a long time to finish. Of course, you can leverage the Spark UI or History UI to see information for each task and stage, but there are some downsides. For instance, you can’t compare the statistics for two Spark jobs side by side, and the Spark History UI can take a long time to load for large Spark jobs.

You can extract the metrics generated by Spark internal classes and persist them to disk as a table or a DataFrame. Then you can query the DataFrame just like any other data science table.

You can use this SparkTaskMetrics package to explore how to use Spark listeners to extract metrics from tasks and jobs.

Build the Spark Metrics package

Use the following command to build the package.

%sh

sbt package

Gather metrics

Import TaskMetricsExplorer. Create the query sql("""SELECT * FROM nested_data""").show(false) and pass it into runAndMeasure. The query should include at least one Spark action in order to trigger a Spark job. Spark does not generate any metrics until a Spark job is executed.

For example:

%scala

import com.databricks.TaskMetricsExplorer

val t = new TaskMetricsExplorer(spark)
sql("""CREATE OR REPLACE TEMPORARY VIEW nested_data AS
       SELECT id AS key,
       ARRAY(CAST(RAND(1) * 100 AS INT), CAST(RAND(2) * 100 AS INT), CAST(RAND(3) * 100 AS INT), CAST(RAND(4) * 100 AS INT), CAST(RAND(5) * 100 AS INT)) AS values,
       ARRAY(ARRAY(CAST(RAND(1) * 100 AS INT), CAST(RAND(2) * 100 AS INT)), ARRAY(CAST(RAND(3) * 100 AS INT), CAST(RAND(4) * 100 AS INT), CAST(RAND(5) * 100 AS INT))) AS nested_values
       FROM range(5)""")
val query = sql("""SELECT * FROM nested_data""").show(false)
val res = t.runAndMeasure(query)

The runAndMeasure method runs the command and gets the task’s internal metrics using a Spark listener. It then runs the query and returns the result:

+---+-------------------+-----------------------+
|key|values             |nested_values          |
+---+-------------------+-----------------------+
|0  |[26, 11, 66, 8, 47]|[[26, 11], [66, 8, 47]]|
|1  |[66, 8, 47, 91, 6] |[[66, 8], [47, 91, 6]] |
|2  |[8, 47, 91, 6, 70] |[[8, 47], [91, 6, 70]] |
|3  |[91, 6, 70, 41, 19]|[[91, 6], [70, 41, 19]]|
|4  |[6, 70, 41, 19, 12]|[[6, 70], [41, 19, 12]]|
+---+-------------------+-----------------------+

The task metrics information is saved in a DataFrame. You can display it with this command:

%scala

res.select($"stageId", $"taskType", $"taskLocality", $"executorRunTime", $"duration", $"executorId", $"host", $"jvmGCTime").show(false)

Then you get:

+-------+----------+-------------+---------------+--------+----------+---------+---------+
|stageId|taskType  |taskLocality |executorRunTime|duration|executorId| host    |jvmGCTime|
+-------+----------+-------------+---------------+--------+----------+---------+---------+
|3      |ResultTask|PROCESS_LOCAL|2              |9       |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|3              |11      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|3              |16      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|2              |20      |driver    |localhost|0        |
|4      |ResultTask|PROCESS_LOCAL|4              |22      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|2              |12      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|3              |17      |driver    |localhost|0        |
|5      |ResultTask|PROCESS_LOCAL|7              |21      |driver    |localhost|0        |
+-------+----------+-------------+---------------+--------+----------+---------+---------+

To view all available metrics names and data types, display the schema of the res DataFrame:

%scala

res.schema.treeString
root
 |-- stageId: integer (nullable = false)
 |-- stageAttemptId: integer (nullable = false)
 |-- taskType: string (nullable = true)
 |-- index: long (nullable = false)
 |-- taskId: long (nullable = false)
 |-- attemptNumber: integer (nullable = false)
 |-- launchTime: long (nullable = false)
 |-- finishTime: long (nullable = false)
 |-- duration: long (nullable = false)
 |-- schedulerDelay: long (nullable = false)
 |-- executorId: string (nullable = true)
 |-- host: string (nullable = true)
 |-- taskLocality: string (nullable = true)
 |-- speculative: boolean (nullable = false)
 |-- gettingResultTime: long (nullable = false)
 |-- successful: boolean (nullable = false)
 |-- executorRunTime: long (nullable = false)
 |-- executorCpuTime: long (nullable = false)
 |-- executorDeserializeTime: long (nullable = false)
 |-- executorDeserializeCpuTime: long (nullable = false)
 |-- resultSerializationTime: long (nullable = false)
 |-- jvmGCTime: long (nullable = false)
 |-- resultSize: long (nullable = false)
 |-- numUpdatedBlockStatuses: integer (nullable = false)
 |-- diskBytesSpilled: long (nullable = false)
 |-- memoryBytesSpilled: long (nullable = false)
 |-- peakExecutionMemory: long (nullable = false)
 |-- recordsRead: long (nullable = false)
 |-- bytesRead: long (nullable = false)
 |-- recordsWritten: long (nullable = false)
 |-- bytesWritten: long (nullable = false)
 |-- shuffleFetchWaitTime: long (nullable = false)
 |-- shuffleTotalBytesRead: long (nullable = false)
 |-- shuffleTotalBlocksFetched: long (nullable = false)
 |-- shuffleLocalBlocksFetched: long (nullable = false)
 |-- shuffleRemoteBlocksFetched: long (nullable = false)
 |-- shuffleWriteTime: long (nullable = false)
 |-- shuffleBytesWritten: long (nullable = false)
 |-- shuffleRecordsWritten: long (nullable = false)
 |-- errorMessage: string (nullable = true)