Optimize a Delta sink in a structured streaming application
You are using a Delta table as the sink for your structured streaming application and you want to optimize the Delta table so that queries are faster.
If your structured streaming application has a very frequent trigger interval, it may not create sufficient files that are eligible for compaction in each microbatch.
The autoOptimize
operation compacts to 128 MB files. An explicit optimize operation compacts Delta Lake files to 1 GB files.
If you do not have a sufficient number of eligible files in each microbatch, you should optimize the Delta table files periodically.
Use foreachBatch
with a mod value
One of the easiest ways to periodically optimize the Delta table sink in a structured streaming application is by using foreachBatch
with a mod value on the microbatch batchId
.
Assume that you have a streaming DataFrame that was created from a Delta table. You use foreachBatch
when writing the streaming DataFrame to the Delta sink.
Within foreachBatch
, the mod value of batchId
is used so the optimize
operation is run after every 10 microbatches, and the zorder
operation is run after every 101 microbatches.
val df = spark.readStream.format("delta").table("<table-name>")
df.writeStream.format("delta")
.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
if(batchId % 10 == 0){spark.sql("optimize <table-name>")}
if(batchId % 101 == 0){spark.sql("optimize <table-name> zorder by (<zorder-column-name>)")}
batchDF.write.format("delta").mode("append").saveAsTable("<table-name>")
}.outputMode("update")
.start()
You can modify the mod value as appropriate for your structured streaming application.