Append output is not supported without a watermark

Append output mode is not supported on aggregated DataFrames without a watermark.

Written by Adam Pavlacka

Last published at: May 17th, 2022

Problem

You are performing an aggregation using append mode and an exception error message is returned.

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

Cause

You cannot use append mode on an aggregated DataFrame without a watermark. This is by design.

Solution

You must apply a watermark to the DataFrame if you want to use append mode on an aggregated DataFrame.

The aggregation must have an event-time column, or a window on the event-time column.

Group the data by window and word and compute the count of each group. .withWatermark() must be called on the same column as the timestamp column used in the aggregation. The example code shows how this can be done.

Replace the value <type> with the type of element you are processing. For example, you would use Row if you are processing by row.

Replace the value <words> with the streaming DataFrame of schema { timestamp: Timestamp, word: String }.

%java

Dataset<type> windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();
%python

windowedCounts = <words> \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
%scala

import spark.implicits._

val windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

You must call .withWatermark() before you perform the aggregation. Attempting otherwise fails with an error message. For example, df.groupBy("time").count().withWatermark("time", "1 min") returns an exception.

Please refer to the Apache Spark documentation on conditions for watermarking to clean the aggregation slate for more information.

Was this article helpful?