Problem
You resume a paused streaming job and it fails with a streaming query exception error message.
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = ABC, runId = DEF] terminated with exception: The stream from your Delta table was expecting process data from version X, but the earliest available version in the _delta_log directory is Y.
Cause
A streaming query exception can occur when the stream is paused for a time period that exceeds the delta.logRetentionDuration
value.
Delta streaming accesses files within the /offsets
and /commits
directories located in the checkpoint path. This determines the last processed delta version as well as the next version to be processed.
Consider a scenario where you have a Delta table, denoted as T1, serving as the stream's source. From the offset data, it is determined that the last version to be processed was X. If the stream is halted and then reinitiated after a 30-day hiatus, it will fail because of the default delta.logRetentionDuration
being 30 days. Consequently, delta version logs older than this period would have been purged. Upon restarting, the stream attempts to resume from version X, which is no longer present in the _delta_log
directory, leading to an error.
Solution
You should avoid pausing streaming jobs for a long period of time. Ideally, you should never pause a job for longer than the delta.logRetentionDuration
value. However, in instances where the stream must be paused for durations surpassing the delta.logRetentionDuration
, you can restart the stream with a new checkpoint location.
When you restart the stream with a new checkpoint location, it compels the stream to reprocess all source data, potentially resulting in data duplication at the target sink. To avoid this, make sure you implement data deduplication strategies such as data filtering or utilizing Delta merge operations while writing into the delta sink.
If you cannot restart the stream with a new checkpoint location, you can use the failOnDataLoss
or the ignoreMissingFiles
options.
- Using the
.option("failOnDataLoss", "false")
within thereadStream
allows the stream to continue without interruption, automatically identifying a new delta version for processing. - Using the
.option("ignoreMissingFiles", "true")
within thereadStream
or the Apache Spark configurationspark.conf.set(“spark.sql.files.ignoreMissingFiles”, “true”)
allows Spark to ignore any files from sources that are missing and continue without interruption.
Note
The failOnDataLoss
and the ignoreMissingFiles
options should only be used as a last resort. They may lead to data loss, as a significant range of delta logs could be missing and thus disregarded by the stream. In such cases, establishing an alternative pipeline to reprocess the omitted data is advisable.
Preventative Measures
To avert these scenarios in the future, it is recommended to adjust the delta.logRetentionDuration
according to your specific requirements. If the stream needs to be paused for a period extending beyond 30 days, modify the delta.logRetentionDuration
to 45 days or 60 days.