Problem
You are using Apache Spark Structured Streaming to source data from a Kafka topic and write it to a Delta table sink, but challenges arise when attempting to reprocess data from the earliest offset in the topic. The stream is appropriately updated with the option "startingOffsets": "earliest"
and restarted. However, the streaming query fails to consume data from the intended earlier offset position. The issue persists, even after you have restarted the Databricks cluster associated with the streaming query.
Cause
If the option "startingOffsets": "earliest"
is set, and upon restarting the streaming query it still fails to consume from the earliest offset position, it may be due to previously running the query without this option. Specifically, if a streaming query is started without the "startingOffsets"
option, and has already completed at least one micro-batch, the “startingOffsets”
option may not take effect as intended.
For Structured Streaming queries with a Kafka source, the "startingOffsets"
option is only applicable when initiating a new streaming query. Structured Streaming manages offset consumption internally, through its checkpoint location, independent of the Kafka consumer. As a result, resuming or restarting a query uses the checkpoint location and disregards the "startingOffsets"
option.
Solution
To reprocess data from the beginning, you must use a new checkpoint directory for the streaming job. By utilizing a distinct checkpoint directory, you ensure that the streaming engine uses the specified "startingOffsets": "earliest"
option when initializing the streaming query, allowing it to consume data from the earliest offset position as intended.