Streaming application missing data from a Delta table when writing to a given destination

Restart a streaming query on a new checkpoint folder with startingVersion option pointing to the next Delta version (X+1).

Written by gopinath.chandrasekaran

Last published at: September 23rd, 2024

Problem 

When using a streaming application to stream data from a Delta table and write to a given destination, you notice data loss. 

Cause

In trying to separately address a failed streaming job by using startingVersion=latest, the tradeoff is possible data loss. The restarted query will read only from the latest available Delta version of the source table, ignoring all versions before the query failed. 

Example 

If the source Delta table has versions [0,1,2,3,4,5] and a streaming query fails while processing version 3, restarting with a new checkpoint folder and startingVersion=latest will cause the query to process from version 6 onwards, without going back to process versions [3,4,5].

Solution

Restart a streaming query on a new checkpoint folder. 

 

  1. Identify the version of the source table that was last processed by the streaming job. You can get this information either from the old checkpoint offset folder or from the last microbatch metrics. Below are the sample microbatch metrics where reservoirVersion represents the Delta version that the batch read. 

 

<pre class="language-html">"endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "aaaaa-aa-aa-aaaa",
      "reservoirVersion" : X,
      "index" : -1,
      "isStartingVersion" : false
    },</pre>

 

  1. Start the streaming job on the new checkpoint folder with startingVersion option pointing to the next Delta version (X+1).

 

<pre class="language-python">spark.readStream.format("delta").option("startingVersion", "X+1").load("<delta-table-path>")    

 

3. If the use case allows, restart the stream without the startingVersion option to process a complete snapshot of the source table, which is more suitable for merge workloads.