Kafka client terminated with OffsetOutOfRangeException

Kafka client is terminated with `OffsetOutOfRangeException` when trying to fetch messages

Written by vikas.yadav

Last published at: June 1st, 2022

Problem

You have an Apache Spark application that is trying to fetch messages from an Apache Kafka source when it is terminated with a kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException error message.

Cause

Your Spark application is trying to fetch expired data offsets from Kafka.

We generally see this in these two scenarios:

Scenario 1

The Spark application is terminated while processing data. When the Spark application is restarted, it tries to fetch data based on previously calculated data offsets. If any of the data offsets have expired during the time the Spark application was terminated, this issue can occur.

Scenario 2

Your retention policy is set to a shorter time than the time require to process the batch. By the time the batch is done processing, some of the Kafka partition offsets have expired. The offsets are calculated for the next batch, and if there is a mismatch in the checkpoint metadata due to the expired offsets, this issue can occur.

Solution

Scenario 1 - Option 1

Delete the existing checkpoint before restarting the Spark application. A new checkpoint offset is created with the details of the newly fetched offset.

The downside to this approach is that some of the data may be missed, because the offsets have expired in Kafka.

Scenario 1 - Option 2

Increase the Kafka retention policy of the topic so that it is longer than the time the Spark application is offline.

No data is missed with this solution, because no offsets have expired before the Spark application is restarted.

There are two types of retention policies:

  • Time based retention - This type of policy defines the amount of time to keep a log segment before it is automatically deleted. The default time based data retention window for all topics is seven days. You can review the Kafka documentation for log.retention.hours, log.retention.minutes, and log.retention.ms for more information.
  • Size based retention - This type of policy defines the amount of data to retain in the log for each topic-partition. This limit is per-partition. This value is unlimited by default. You can review the Kafka documentation for log.retention.bytes for more information.
Delete

Info

If multiple retention policies are set, the more restrictive one controls. This can be overridden on a per topic basis.

Review Kafka’s Topic-level configuration for more information on how to set a per topic override.

Scenario 2 - Option 1

Increase the retention policy of the partition. This is accomplished in the same way as the solution for Scenario 1 - Option 2.

Scenario 2 - Option 2

Increase the number of parallel workers by configuring .option("minPartitions",<X>) for readStream.

The option minPartitions defines the minimum number of partitions to read from Kafka. By default, Spark uses a one-to-one mapping of Kafka topic partitions to Spark partitions when consuming data from Kafka. If you set the option minPartitions to a value greater than the number of your Kafka topic partitions, Spark separates the Kafka topic partitions into smaller pieces.

This option is recommended at times of data skew, peak loads, and if your stream is falling behind. Setting this value greater than the default results in the initialization of Kafka consumers at each trigger. This can impact performance if you use SSL when connecting to Kafka.