Problem
You are running a series of structured streaming jobs and writing to a file sink. Every 10th run appears to run slower than the previous jobs.
Cause
The file sink creates a _spark_metadata folder in the target path. This metadata folder stores information about each batch, including which files are part of the batch. This is required to provide an exactly-once guarantee for file sink streaming. By default, on every 10th batch, the previous nine batch data files are compacted into a single file at /<target-folder>/data/_spark_metadata/9.compact.
Solution
There are three possible solutions. Choose the one that is most appropriate for your situation.
- Option 1: Mitigates the issue in a production environment, with minimal code changes, but retains less metadata.
- Option 2: Recommended if you can switch to using Delta tables. This is a good long-term solution.
- Option 3: Recommended if the pipeline doesn't require exactly-once semantics or downstream can handle duplicates.
Option 1: Shorten metadata retention time
The metadata folder grows larger over time by default. To mitigate this, you can set a maximum retention time for the output files. Files older than the retention period are automatically excluded, which limits the number of files in the metadata folder. Fewer files in the metadata folder means compaction takes less time.
Set the retention period when you write the streaming DataFrame to your file sink:
%python check_point = '<checkpoint-folder-path>' target_path = '<target-path>' retention = '<retention-time>' # You can provide the value as string format of the time in hours or days. For example, "12h", "7d", etc. This value is disabled by default df.writeStream.format('json').mode('append').option('checkPointLocation', check_point).option('path', target-path).option('retention', retention).start()
Option 2: Use a Delta table as the sink
Delta tables do not use a spark_metadata folder and they provide exactly-once semantics.
For more information, please review the documentation on using a Delta table as a sink (AWS | Azure | GCP).
Option 3: Use foreachBatch
foreachBatch does not create a spark_metadata folder when writing to the sink.