Incorrect numInputRows values even with row limits set per batch

Use endOffset instead of latestOffset to calculate the number of records read.

Written by sidhant.sahu

Last published at: September 12th, 2024

Problem

You are running an Apache Spark streaming query and reading data from Kafka streams when you encounter an issue with the numInputRows metric.

The numInputRows metric displays an incorrect value, showing a significantly higher number of rows than expected. This is true even when configurations such as maxOffsetsPerTrigger are set to limit the number of rows processed per batch.

Cause

This is related to the way actions are performed on the streaming DataFrame. When actions like df.count() are called multiple times on the same stream, the numInputRows value can multiply, leading to an inflated count. This behavior is expected because numInputRows counts the total number of records read for all actions in the code, not just unique records. Additionally, using latestOffset instead of endOffset for calculations can contribute to the discrepancy.

Solution

  1. Ensure you use endOffset instead of latestOffset to calculate the number of records read. The correct calculation is The sum of all (end offset - start offset).
  2. Review and minimize the number of actions performed on the streaming DataFrame to avoid multiple counts of the same data.