Problem
Your Delta table stream fails to consume and you receive the following error message.
org.apache.spark.SparkException: [FAILED_READ_FILE.DBR_FILE_NOT_EXIST] Error while reading file abfss: <storage>.dfs.core.windows.net/<path-to-file>/part-xxxx-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-c000.snappy.parquet. [DELTA_FILE_NOT_FOUND_DETAILED] File abfss://<storage>.dfs.core.windows.net/<path-to-file>/part-xxxx-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-c000.snappy.parquet referenced in the transaction log cannot be found.
Cause
If the file was not manually deleted, the issue may stem from the Delta table’s last successful consumption time relative to the VACUUM
operation time.
The VACUUM
operation is triggered daily and removes files older than seven days. Files marked for deletion on the Delta transaction logs are deleted. When this happens, the latest offset file under the checkpoint may still reference the deleted files, resulting in DBR_FILE_NOT_EXIST
errors.
This is most common with Delta tables that include non-append operations such as OPTIMIZE
combined with periodic VACUUM
commands.
Solution
There are three options available. If you are able, combining option two and three is best.
1. Consume the source table from scratch, essentially streaming from the Delta table as a new consumer to a new location with a new checkpoint.
Note
This method can sometimes create challenges for downstream consumers. For instance, if the consumer writes events to Kafka, it could result in duplicate events.
2. Force the consumer to skip missing versions by starting fresh and consuming from a specific version. (Do not delete the metadata files for missing versions. This results in a different error.)
Use the following code to set a new checkpoint location and read from the earliest available version of the data.
streaming_df = spark.readStream \
.format("delta") \
.option("startingVersion", "20") \
.load(delta_table_path)
query = streaming_df.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", ”/<new-checkpoint-location-path>”) \
.start(output_delta_table_path)
3. If your data includes a timestamp field, load the data from scratch using a filter based on a specific timestamp. Determine the timestamp when the consumer last successfully processed records. Then, restart consumption from scratch, applying a filter to process only records with a timestamp greater than or equal to that value.
streaming_df = spark.readStream \
.format("delta") \
.load(delta_table_path) \
.filter(f"timestamp_field >= "<your-last-successful-consume-timestamp>")
query = streaming_df.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", ”/<new-checkpoint-location-path>”) \
.start(output_delta_table_path)