How to use Apache Spark metrics

Learn how to use Apache Spark metrics with Databricks.

Written by Adam Pavlacka

Last published at: May 16th, 2022

This article gives an example of how to monitor Apache Spark components using the Spark configurable metrics system. Specifically, it shows how to set a new source and enable a sink.

For detailed information about the Spark components available for metrics collection, including sinks supported out of the box, follow the documentation link above.

Delete

Info

There are several other ways to collect metrics to get insight into how a Spark job is performing, which are also not covered in this article:

  • SparkStatusTracker (Source, API): monitor job, stage, or task progress
  • StreamingQueryListener (Source, API): intercept streaming events
  • SparkListener (Source): intercept events from Spark scheduler

For information about using other third-party tools to monitor Spark jobs in Databricks, see Monitor performance (AWS | Azure).

How does this metrics collection system work? Upon instantiation, each executor creates a connection to the driver to pass the metrics.

The first step is to write a class that extends the Source trait:

%scala

class MySource extends Source {
  override val sourceName: String = "MySource"

  override val metricRegistry: MetricRegistry = new MetricRegistry

  val FOO: Histogram = metricRegistry.histogram(MetricRegistry.name("fooHistory"))
  val FOO_COUNTER: Counter = metricRegistry.counter(MetricRegistry.name("fooCounter"))
}

The next step is to enable the sink. In this example, the metrics are printed to the console:

%scala

val spark: SparkSession = SparkSession
    .builder
    .master("local[*]")
    .appName("MySourceDemo")
    .config("spark.driver.host", "localhost")
    .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
.getOrCreate()
Delete

Info

To sink metrics to Prometheus, you can use this third-party library: https://github.com/banzaicloud/spark-metrics.

The last step is to instantiate the source and register it with SparkEnv:

%scala

val source: MySource = new MySource
SparkEnv.get.metricsSystem.registerSource(source)

You can view a complete, buildable example at https://github.com/newroyker/meter.