Optimize streaming transactions with .trigger

Use .trigger to define the storage update interval. A higher value reduces the number of storage transactions.

Written by chetan.kardekar

Last published at: October 26th, 2022

When running a structured streaming application that uses cloud storage buckets (S3, ADLS Gen2, etc.) it is easy to incur excessive transactions as you access the storage bucket.

Failing to specify a .trigger option in your streaming code is one common reason for a high number of storage transactions. When a .trigger option is not specified, the storage can be polled frequently. This happens immediately after the completion of each micro-batch by default.

The default behavior is described in the official Apache Spark documentation on triggers as, "If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing."

This sample code does not have a .trigger option defined. If run, it would result in excessive storage transactions.

%python

spark.readStream.format("delta").load("<delta_table_path>")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","<checkpoint_path>")
.options(**writeConfig)
.start()


You can reduce the number of storage transactions by setting the .trigger option in the .writeStream. Setting .trigger processing time to a few seconds prevents short polling.

Instructions

The default behavior is to check the source for updates every 10 ms. For most users, a longer interval between source updates will have no noticeable effect on performance, but the transaction costs are greatly reduced.

For example, let's use a processing time of 5 seconds. That is 500 times slower than 10 ms. The storage calls are reduced accordingly.

Setting a processing time of 5 seconds requires adding .trigger(processingTime='5 seconds') to the .writeStream.

For example, modifying our existing sample code to include a .trigger processing time of 5 seconds only requires the addition of one line.

%python

spark.readStream.format("delta").load("<delta_table_path>")
.writeStream
.format("delta")
.trigger(processingTime='5 seconds')  #Added line of code that defines .trigger processing time.
.outputMode("append")
.option("checkpointLocation","<checkpoint_path>")
.options(**writeConfig)
.start()


You should experiment with the .trigger processing time to determine a value that is optimized for your application.