Problem
You are performing an aggregation using append mode and an exception error message is returned.
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
Cause
You cannot use append mode on an aggregated DataFrame without a watermark. This is by design.
Solution
You must apply a watermark to the DataFrame if you want to use append mode on an aggregated DataFrame.
The aggregation must have an event-time column, or a window on the event-time column.
Group the data by window and word and compute the count of each group. .withWatermark() must be called on the same column as the timestamp column used in the aggregation. The example code shows how this can be done.
Replace the value <type> with the type of element you are processing. For example, you would use Row if you are processing by row.
Replace the value <words> with the streaming DataFrame of schema { timestamp: Timestamp, word: String }.
%java Dataset<type> windowedCounts = <words> .withWatermark("timestamp", "10 minutes") .groupBy( functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), words.col("word")) .count();
%python windowedCounts = <words> \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word) \ .count()
%scala import spark.implicits._ val windowedCounts = <words> .withWatermark("timestamp", "10 minutes") .groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word") .count()
You must call .withWatermark() before you perform the aggregation. Attempting otherwise fails with an error message. For example, df.groupBy("time").count().withWatermark("time", "1 min") returns an exception.
Please refer to the Apache Spark documentation on conditions for watermarking to clean the aggregation slate for more information.