Resumed streaming job fails after pause with StreamingQueryException error

Avoid pausing streaming jobs for longer than the delta.logRetentionDuration value, or restart the stream with a new checkpoint location.

Written by jayant.sharma

Last published at: December 26th, 2024

Problem

You resume a paused streaming job and it fails with a streaming query exception error message.

 

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = ABC, runId = DEF] terminated with exception: The stream from your Delta table was expecting process data from version X, but the earliest available version in the _delta_log directory is Y. 

 

Cause

A streaming query exception can occur when the stream is paused for a time period that exceeds the delta.logRetentionDuration value.

Delta streaming accesses files within the /offsets and /commits directories located in the checkpoint path. This determines the last processed delta version as well as the next version to be processed.

Consider a scenario where you have a Delta table, denoted as T1, serving as the stream's source. From the offset data, it is determined that the last version to be processed was X. If the stream is halted and then reinitiated after a 30-day hiatus, it will fail because of the default delta.logRetentionDuration being 30 days. Consequently, delta version logs older than this period would have been purged. Upon restarting, the stream attempts to resume from version X, which is no longer present in the _delta_log directory, leading to an error.

 

Solution

You should avoid pausing streaming jobs for a long period of time. Ideally, you should never pause a job for longer than the delta.logRetentionDuration value. However, in instances where the stream must be paused for durations surpassing the delta.logRetentionDuration, you can restart the stream with a new checkpoint location.

When you restart the stream with a new checkpoint location, it compels the stream to reprocess all source data, potentially resulting in data duplication at the target sink. To avoid this, make sure you implement data deduplication strategies such as data filtering or utilizing Delta merge operations while writing into the delta sink.

If you cannot restart the stream with a new checkpoint location, you can use the failOnDataLoss or the ignoreMissingFiles options.

  • Using the .option("failOnDataLoss", "false") within the readStream allows the stream to continue without interruption, automatically identifying a new delta version for processing.
  • Using the .option("ignoreMissingFiles", "true") within the readStream or the Apache Spark configuration spark.conf.set(“spark.sql.files.ignoreMissingFiles”, “true”) allows Spark to ignore any files from sources that are missing and continue without interruption.

 

Note

The failOnDataLoss and the ignoreMissingFiles options should only be used as a last resort. They may lead to data loss, as a significant range of delta logs could be missing and thus disregarded by the stream. In such cases, establishing an alternative pipeline to reprocess the omitted data is advisable.

 

 

Preventative Measures

To avert these scenarios in the future, it is recommended to adjust the delta.logRetentionDuration according to your specific requirements. If the stream needs to be paused for a period extending beyond 30 days, modify the delta.logRetentionDuration to 45 days or 60 days.