Problem
You had a network issue (or similar) while a write operation was in progress. You are rerunning the job, but partially uncommitted files during the failed run are causing unwanted data duplication.
Cause
How Databricks commit protocol works:
- The DBIO commit protocol (AWS | Azure | GCP) is transactional. Files are only committed after a transaction successfully completes. If the job fails in the middle of a transaction, only _started_<id> and other partially written data files are stored.
- When the job is rerun, a new _started_<id> file is created. Once the transaction is successfully completed a new _committed_<id> file is generated. This _committed_<id> file is a JSON file that contains all the parquet file names to be read by the upstream.
- If you read the folder using Apache Spark there are no duplicates as it only reads the files which are inside _committed_<id>.
- To delete the uncommitted data files from the target path, DBIO runs VACUUM at the end of every job. By default, uncommitted files older than 48 hours (2 days) are removed.
When the issue occurs:
- If you read the folder within two days of the failed job, using another tool (which does not use DBIO or Spark) or read the folder with a wildcard (spark.read.load('/path/*')), all the files are read, including the uncommitted files. This results in data duplication.
Solution
The ideal solution is to only use Spark or DBIO to access file storage.
If you must preserve access for other tools, you should update the value of spark.databricks.io.directoryCommit.vacuum.dataHorizonHours in your cluster's Spark config (AWS | Azure | GCP).
You can also update this property in a notebook:
spark.conf.set("spark.databricks.io.directoryCommit.vacuum.dataHorizonHours","<number-of-hours>")
This property determines which files are deleted when the automatic VACCUM runs at the end of every job. Any file older than the time specified is removed.
The default value is 48 hours (2 days). You can reduce this to as little as one hour, depending on your specific needs. If you set the value to one hour, the automatic VACCUM removes any uncommitted files older than one hour at the end of every job.
Alternatively, you can run VACUUM manually after rerunning a failed job with a RETAIN HOURS value low enough to remove the partially uncommitted files..
Please review the VACUUM (AWS | Azure | GCP) documentation for more information.
See vacuum documentation: https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html