Optimize a Delta sink in a structured streaming application

You are using a Delta table as the sink for your structured streaming application and you want to optimize the Delta table so that queries are faster.

If your structured streaming application has a very frequent trigger interval, it may not create sufficient files that are eligible for compaction in each microbatch.

The autoOptimize operation compacts to 128 MB files. An explicit optimize operation compacts Delta Lake files to 1 GB files.

If you do not have a sufficient number of eligible files in each microbatch, you should optimize the Delta table files periodically.

Use foreachBatch with a mod value

One of the easiest ways to periodically optimize the Delta table sink in a structured streaming application is by using foreachBatch with a mod value on the microbatch batchId.

Assume that you have a streaming DataFrame that was created from a Delta table. You use foreachBatch when writing the streaming DataFrame to the Delta sink.

Within foreachBatch, the mod value of batchId is used so the optimize operation is run after every 10 microbatches, and the zorder operation is run after every 101 microbatches.

val df = spark.readStream.format("delta").table("<table-name>")
df.writeStream.format("delta")
  .foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    if(batchId % 10 == 0){spark.sql("optimize <table-name>")}
    if(batchId % 101 == 0){spark.sql("optimize <table-name> zorder by (<zorder-column-name>)")}
    batchDF.write.format("delta").mode("append").saveAsTable("<table-name>")
  }.outputMode("update")
  .start()

You can modify the mod value as appropriate for your structured streaming application.