Structured streaming jobs slow down on every 10th batch

Automatic compaction of the metadata folder can slow down structured streaming jobs.

Written by gopinath.chandrasekaran

Last published at: October 28th, 2022

Problem

You are running a series of structured streaming jobs and writing to a file sink. Every 10th run appears to run slower than the previous jobs.

Cause

The file sink creates a _spark_metadata folder in the target path. This metadata folder stores information about each batch, including which files are part of the batch. This is required to provide an exactly-once guarantee for file sink streaming. By default, on every 10th batch, the previous nine batch data files are compacted into a single file at /<target-folder>/data/_spark_metadata/9.compact.

Solution

There are three possible solutions. Choose the one that is most appropriate for your situation.

  • Option 1: Mitigates the issue in a production environment, with minimal code changes, but retains less metadata.
  • Option 2: Recommended if you can switch to using Delta tables. This is a good long-term solution.
  • Option 3: Recommended if the pipeline doesn't require exactly-once semantics or downstream can handle duplicates. 

Option 1: Shorten metadata retention time

The metadata folder grows larger over time by default. To mitigate this, you can set a maximum retention time for the output files. Files older than the retention period are automatically excluded, which limits the number of files in the metadata folder. Fewer files in the metadata folder means compaction takes less time.

Set the retention period when you write the streaming DataFrame to your file sink:

%python

check_point = '<checkpoint-folder-path>'
target_path = '<target-path>'
retention = '<retention-time>' # You can provide the value as string format of the time in hours or days. For example, "12h", "7d", etc. This value is disabled by default

df.writeStream.format('json').mode('append').option('checkPointLocation', check_point).option('path', target-path).option('retention', retention).start() 
Delete

Info

Retention defines the time to live (TTL) for output files. Output files committed before the TTL range are excluded from the metadata log. Attempts to read the sink's output directory will not process any files older than the TTL range.

Option 2: Use a Delta table as the sink

Delta tables do not use a spark_metadata folder and they provide exactly-once semantics.

For more information, please review the documentation on using a Delta table as a sink (AWS | Azure | GCP). 

Option 3: Use foreachBatch

foreachBatch does not create a spark_metadata folder when writing to the sink. 

Delete

Warning

Exactly-once semantics are not supported with foreachBatch. Only use foreachBatch if you are certain that your application does not require exactly-once semantics.

This warning can be disregarded if you are writing to a Delta table.