Checkpoint files not being deleted when using foreachBatch()

Learn how to prevent foreachBatch() checkpoint files from using a large amount of storage.

Written by Adam Pavlacka

Last published at: May 19th, 2022

Problem

You have a streaming job using foreachBatch() to process DataFrames.

%scala

streamingDF.writeStream.outputMode("append").foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format("parquet").mode("overwrite").save(output_directory)
}.start()

Checkpoint files are being created, but are not being deleted.

You can verify the problem by navigating to the root directory and looking in the /local_disk0/tmp/ folder. Checkpoint files remain in the folder.

Cause

The command foreachBatch() is used to support DataFrame operations that are not normally supported on streaming DataFrames. By using foreachBatch() you can apply these operations to every micro-batch. This requires a checkpoint directory to track the streaming updates.

If you have not specified a custom checkpoint location, a default checkpoint directory is created at /local_disk0/tmp/.

Databricks uses the checkpoint directory to ensure correct and consistent progress information. When a stream is shut down, either purposely or accidentally, the checkpoint directory allows Databricks to restart and pick up exactly where it left off.

If a stream is shut down by cancelling the stream from the notebook, the Databricks job attempts to clean up the checkpoint directory on a best-effort basis. If the stream is terminated in any other way, or if the job is terminated, the checkpoint directory is not cleaned up.

This is as designed.

Solution

You should manually specify the checkpoint directory with the checkpointLocation option.

%scala

streamingDF.writeStream.option("checkpointLocation","<checkpoint-path>").outputMode("append").foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format("parquet").mode("overwrite").save(output_directory)
}.start()