When running a structured streaming application that uses cloud storage buckets (S3, ADLS Gen2, etc.) it is easy to incur excessive transactions as you access the storage bucket.
Failing to specify a .trigger option in your streaming code is one common reason for a high number of storage transactions. When a .trigger option is not specified, the storage can be polled frequently. This happens immediately after the completion of each micro-batch by default.
The default behavior is described in the official Apache Spark documentation on triggers as, "If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing."
This sample code does not have a .trigger option defined. If run, it would result in excessive storage transactions.
%python spark.readStream.format("delta").load("<delta_table_path>") .writeStream .format("delta") .outputMode("append") .option("checkpointLocation","<checkpoint_path>") .options(**writeConfig) .start()
You can reduce the number of storage transactions by setting the .trigger option in the .writeStream. Setting .trigger processing time to a few seconds prevents short polling.
Instructions
The default behavior is to check the source for updates every 10 ms. For most users, a longer interval between source updates will have no noticeable effect on performance, but the transaction costs are greatly reduced.
For example, let's use a processing time of 5 seconds. That is 500 times slower than 10 ms. The storage calls are reduced accordingly.
Setting a processing time of 5 seconds requires adding .trigger(processingTime='5 seconds') to the .writeStream.
For example, modifying our existing sample code to include a .trigger processing time of 5 seconds only requires the addition of one line.
%python spark.readStream.format("delta").load("<delta_table_path>") .writeStream .format("delta") .trigger(processingTime='5 seconds') #Added line of code that defines .trigger processing time. .outputMode("append") .option("checkpointLocation","<checkpoint_path>") .options(**writeConfig) .start()
You should experiment with the .trigger processing time to determine a value that is optimized for your application.