Offset reprocessing issues in streaming queries with a Kafka source

Resolve Kafka offset reprocessing issues in Structured Streaming by using a new checkpoint directory.

Written by harikrishnan.kunhumveettil

Last published at: January 19th, 2024

Problem

You are using Apache Spark Structured Streaming to source data from a Kafka topic and write it to a Delta table sink, but challenges arise when attempting to reprocess data from the earliest offset in the topic. The stream is appropriately updated with the option "startingOffsets": "earliest" and restarted. However, the streaming query fails to consume data from the intended earlier offset position. The issue persists, even after you have restarted the Databricks cluster associated with the streaming query.

Cause

If the option "startingOffsets": "earliest" is set, and upon restarting the streaming query it still fails to consume from the earliest offset position, it may be due to previously running the query without this option. Specifically, if a streaming query is started without the "startingOffsets" option, and has already completed at least one micro-batch, the “startingOffsets” option may not take effect as intended.

For Structured Streaming queries with a Kafka source, the "startingOffsets" option is only applicable when initiating a new streaming query. Structured Streaming manages offset consumption internally, through its checkpoint location, independent of the Kafka consumer. As a result, resuming or restarting a query uses the checkpoint location and disregards the "startingOffsets" option.

Solution

To reprocess data from the beginning, you must use a new checkpoint directory for the streaming job. By utilizing a distinct checkpoint directory, you ensure that the streaming engine uses the specified "startingOffsets": "earliest" option when initializing the streaming query, allowing it to consume data from the earliest offset position as intended.