This article gives an example of how to monitor Apache Spark components using the Spark configurable metrics system. Specifically, it shows how to set a new source and enable a sink.
For detailed information about the Spark components available for metrics collection, including sinks supported out of the box, follow the documentation link above.
How does this metrics collection system work? Upon instantiation, each executor creates a connection to the driver to pass the metrics.
The first step is to write a class that extends the Source trait:
%scala class MySource extends Source { override val sourceName: String = "MySource" override val metricRegistry: MetricRegistry = new MetricRegistry val FOO: Histogram = metricRegistry.histogram(MetricRegistry.name("fooHistory")) val FOO_COUNTER: Counter = metricRegistry.counter(MetricRegistry.name("fooCounter")) }
The next step is to enable the sink. In this example, the metrics are printed to the console:
%scala val spark: SparkSession = SparkSession .builder .master("local[*]") .appName("MySourceDemo") .config("spark.driver.host", "localhost") .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") .getOrCreate()
The last step is to instantiate the source and register it with SparkEnv:
%scala val source: MySource = new MySource SparkEnv.get.metricsSystem.registerSource(source)
You can view a complete, buildable example at https://github.com/newroyker/meter.