Problem
You are attempting to query a Delta table when you get an IllegalStateException error saying that the metadata could not be recovered.
Error in SQL statement: IllegalStateException: The metadata of your Delta table couldn't be recovered while Reconstructing version: 691193. Did you manually delete files in the _delta_log directory? Set spark.databricks.delta.stateReconstructionValidation.enabled to "false" to skip validation.
Cause
The Delta table is not accessible due to corrupted CRC or JSON files.
For example, you disabled multicluster writes (AWS | Azure | GCP) but you still have more than one cluster attempting to write to the same table, leading to corruption.
Solution
These steps make the table readable and allow you to safely remove the corrupted delta transaction files.
- Disable the validation config. This allows you to bypass the IllegalStateException error when you run a query on the Delta table. This should not be used for production but is necessary to access the corrupted version of the table. Any running cluster can be used for this process. A cluster restart is not required to make changes in this property.
%sql set spark.databricks.delta.stateReconstructionValidation.enabled=False;
- Identify which version of the table is corrupted.
- Identify the latest Delta version of the table by checking the _delta_log folder.
%fs ls /delta-table-root-folder/_delta_log
- Run select * from table@v(<latest>) in a SQL notebook cell. You should see empty results as the latest version is corrupted. Keep running select * from table@v(<latest-x>) until you find a version that is not corrupted.For the purpose of this article, assume that versions 6 and 7 are corrupted. Version 5 is intact and has not been corrupted.
- Identify the latest Delta version of the table by checking the _delta_log folder.
- Make a backup of the Parquet files (data files) that were added to the table in version 6 and 7.
This sample code moves the Parquet files to a backup folder.%python import os import shutil def get_parquet_files_list(corrupted_versions): ''' Get list contains corrupted json versions, identify the parquet data files from these version and returns them in list ''' final_list=[] for file in corrupted_versions: delta_table_location = '<delta-table-location>' # prefix /dbfs not needed json_path = delta_table_location+'/_delta_log/'+str(file) df = spark.read.json(<json_path>).select('add.path') target_list = list(df.select('path').toPandas()['path']) final_list.extend(target_list) return list(filter(None, final_list)) def copy_files(final_list,source_folder,backup_folder): ''' copy parquet files from source to backup folder ''' i=1 for file_path in final_list: src_path = source_folder + file_path trg_path = backup_folder + file_path os.makedirs(os.path.dirname(trg_path), exist_ok=True) shutil.copy(src_path, trg_path) print("Done --> "+str(i)) i = i+1 def main(): source_folder = '<delta-table-location>' # prefix /dbfs is needed if the table location is in dbfs folder or in mount point backup_folder = '<backup-folder-storage-path>' # prefix /dbfs is needed corrupted_versions = ['00000000000000000006.json','00000000000000000007.json'] # enter the corrupted versions json files here final_list = get_parquet_files_list(corrupted_versions) copy_files(final_list,source_folder,backup_folder) if __name__ == "__main__": main()
- Remove the CRC and JSON files for the corrupted versions of the table.
%fs rm /delta-table-root-folder/_delta_log/00000000000000000006.json rm /delta-table-root-folder/_delta_log/00000000000000000006.crc
%fs rm /delta-table-root-folder/_delta_log/00000000000000000007.json rm /delta-table-root-folder/_delta_log/00000000000000000007.crc
- Run RESTORE TABLE to restore the Delta table to the most recent version that is not corrupted. In our example, this is version 5.
%sql RESTORE TABLE <table-name> TO VERSION AS OF 5
- Now that the corrupted files have been removed, the table can be queried on any version. To avoid data loss, you must append the Parquet files that you previously backed up. This ensures that any data added to the corrupted versions of the table are inserted into the restored version and unnecessary data loss can be avoided.
While appending, please verify:- If the target table is partitioned. If it is partitioned, include the partitionBy option in the append statement.
- Confirm all data types match the target table.
%python backup_folder = '/backup-folder' target_table_path = '/delta-table-root-folder/' append_df = spark.read.format('parquet').load(<backup-folder>) append_df_new = append_df.withColumn('col1',col('col1').cast('string'))# casting to match the target table schema append_df_new.write.format('delta').partitionBy('col2','col3','col4').mode('append').option("path",<target-table-path>).saveAsTable("db.tableName")
- Re-enable validation checking.
%sql set spark.databricks.delta.stateReconstructionValidation.enabled=True;