Problem
When using a streaming application to stream data from a Delta table and write to a given destination, you notice data loss.
Cause
In trying to separately address a failed streaming job by using startingVersion=latest
, the tradeoff is possible data loss. The restarted query will read only from the latest available Delta version of the source table, ignoring all versions before the query failed.
Example
If the source Delta table has versions [0,1,2,3,4,5] and a streaming query fails while processing version 3, restarting with a new checkpoint folder and startingVersion=latest
will cause the query to process from version 6 onwards, without going back to process versions [3,4,5].
Solution
Restart a streaming query on a new checkpoint folder.
- Identify the version of the source table that was last processed by the streaming job. You can get this information either from the old checkpoint offset folder or from the last microbatch metrics. Below are the sample microbatch metrics where
reservoirVersion
represents the Delta version that the batch read.
<pre class="language-html">"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "aaaaa-aa-aa-aaaa",
"reservoirVersion" : X,
"index" : -1,
"isStartingVersion" : false
},</pre>
- Start the streaming job on the new checkpoint folder with
startingVersion
option pointing to the next Delta version (X+1).
<pre class="language-python">spark.readStream.format("delta").option("startingVersion", "X+1").load("<delta-table-path>")
3. If the use case allows, restart the stream without the startingVersion
option to process a complete snapshot of the source table, which is more suitable for merge workloads.