Delta table as a streaming source returns error DELTA_FILE_NOT_FOUND_DETAILED even though no user or lifecycle rule has deleted files

Consume the source table from scratch, consume from a specific version, or load data using a specific timestamp as filter.

Written by avi.yehuda

Last published at: February 27th, 2025

Problem

Your Delta table stream fails to consume and you receive the following error message.

 

org.apache.spark.SparkException: [FAILED_READ_FILE.DBR_FILE_NOT_EXIST] Error while reading file abfss: <storage>.dfs.core.windows.net/<path-to-file>/part-xxxx-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-c000.snappy.parquet. [DELTA_FILE_NOT_FOUND_DETAILED] File abfss://<storage>.dfs.core.windows.net/<path-to-file>/part-xxxx-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-c000.snappy.parquet referenced in the transaction log cannot be found. 

 

Cause

If the file was not manually deleted, the issue may stem from the Delta table’s last successful consumption time relative to the VACUUM operation time. 

The VACUUM operation is triggered daily and removes files older than seven days. Files marked for deletion on the Delta transaction logs are deleted. When this happens, the latest offset file under the checkpoint may still reference the deleted files, resulting in DBR_FILE_NOT_EXIST errors.

This is most common with Delta tables that include non-append operations such as OPTIMIZE combined with periodic VACUUM commands. 

 

Solution

There are three options available. If you are able, combining option two and three is best. 

 

1. Consume the source table from scratch, essentially streaming from the Delta table as a new consumer to a new location with a new checkpoint. 

 

Note

This method can sometimes create challenges for downstream consumers. For instance, if the consumer writes events to Kafka, it could result in duplicate events. 

 

 

2. Force the consumer to skip missing versions by starting fresh and consuming from a specific version. (Do not delete the metadata files for missing versions. This results in a different error.)

 

Use the following code to set a new checkpoint location and read from the earliest available version of the data.

 

streaming_df = spark.readStream \
    .format("delta") \
    .option("startingVersion", "20") \ 
    .load(delta_table_path)

query = streaming_df.writeStream \ 
   .outputMode("append") \
      .format("delta") \
     .option("checkpointLocation", ”/<new-checkpoint-location-path>”) \ 
     .start(output_delta_table_path) 

 

3. If your data includes a timestamp field, load the data from scratch using a filter based on a specific timestamp. Determine the timestamp when the consumer last successfully processed records. Then, restart consumption from scratch, applying a filter to process only records with a timestamp greater than or equal to that value.

 

streaming_df = spark.readStream \
 .format("delta") \
 .load(delta_table_path) \
.filter(f"timestamp_field >= "<your-last-successful-consume-timestamp>")  
query = streaming_df.writeStream \ 
   .outputMode("append") \
      .format("delta") \
     .option("checkpointLocation", ”/<new-checkpoint-location-path>”) \ 
     .start(output_delta_table_path)