Problem
Within the forEachBatch
function, while executing batch DataFrame aggregation operations, your streaming job using the forEachBatch
sink fails with a DUPLICATED_MAP_KEY
error.
[ERROR] default_source: <microbatch-name> error Raw Stream Failed writing with mergeSchema to s3://XXX/ in epoch_id X on XXXX YYYY-MM-DD, Attempt to write without mergeSchema option. Error - An error occurred while calling o2464.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in stage 141.0 failed 4 times, most recent failure: Lost task 39.3 in stage 141.0 (TID XXXX) (XX.XXX.XX.XX executor X): org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key User was found, please check the input data.
If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence. SQLSTATE: 23505
The error still occurs despite:
- Setting the Apache Spark configuration
spark.sql.mapKeyDedupPolicy
toLAST_WIN
both within or outside theforEachBatch
function. - Adding as a key value pair under Compute > Configuration > Advanced Spark config.
Cause
The spark.sql.mapKeyDedupPolicy
setting is configured with an incorrect Spark session object. The configuration must be applied to the specific Spark session associated with the streaming DataFrame within the forEachBatch
function.
Solution
Use the following approach inside the forEachBatch
function to ensure the deduplication policy is applied to the active session handling the streaming job.
def process_batch(df, epoch_id):
spark = df.sparkSession
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
df.write.format("<your-format>").save("<your-target-path>")
For more information on upserting from streaming queries using foreachBatch
, review the Delta table streaming reads and writes (AWS | Azure | GCP) documentation.