ForEachBatch Streaming fails with DUPLICATED_MAP_KEY error

Remove duplicate map keys.

Written by shanmugavel.chandrakasu

Last published at: December 21st, 2024

Problem

Within the forEachBatch function, while executing batch DataFrame aggregation operations, your streaming job using the forEachBatch sink fails with a DUPLICATED_MAP_KEY error.

 

[ERROR] default_source: <microbatch-name> error Raw Stream Failed writing with mergeSchema to s3://XXX/ in epoch_id X on XXXX YYYY-MM-DD, Attempt to write without mergeSchema option. Error - An error occurred while calling o2464.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in stage 141.0 failed 4 times, most recent failure: Lost task 39.3 in stage 141.0 (TID XXXX) (XX.XXX.XX.XX executor X): org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key User was found, please check the input data.
If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence. SQLSTATE: 23505 

 

The error still occurs despite: 

  • Setting the Apache Spark configuration spark.sql.mapKeyDedupPolicy to LAST_WIN both within or outside the forEachBatch function.
  • Adding as a key value pair under Compute > Configuration > Advanced Spark config

 

Cause

The spark.sql.mapKeyDedupPolicy setting is configured with an incorrect Spark session object. The configuration must be applied to the specific Spark session associated with the streaming DataFrame within the forEachBatch function.

 

Solution

Use the following approach inside the forEachBatch function to ensure the deduplication policy is applied to the active session handling the streaming job.

 

def process_batch(df, epoch_id): 
spark = df.sparkSession 
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") 
df.write.format("<your-format>").save("<your-target-path>")

 

For more information on upserting from streaming queries using foreachBatch, review the Delta table streaming reads and writes (AWSAzureGCP) documentation.