Recover from a DELTA_LOG corruption error

Learn how to repair a Delta table that reports an IllegalStateException error when queried.

Written by gopinath.chandrasekaran

Last published at: February 17th, 2023

Problem

You are attempting to query a Delta table when you get an IllegalStateException error saying that the metadata could not be recovered.

Error in SQL statement: IllegalStateException: 

The metadata of your Delta table couldn't be recovered while Reconstructing

version: 691193. Did you manually delete files in the _delta_log directory?

Set spark.databricks.delta.stateReconstructionValidation.enabled

to "false" to skip validation.

Cause

The Delta table is not accessible due to corrupted CRC or JSON files.

For example, you disabled multicluster writes (AWS | Azure | GCP) but you still have more than one cluster attempting to write to the same table, leading to corruption.

Solution

Delete

Warning

This solution is a workaround that allows you to recover the Delta table and make it readable. This solution assumes that your Delta table is append-only.

If your Delta table contains an update operation DO NOT use this solution. Open a ticket with Databricks Support.

These steps make the table readable and allow you to safely remove the corrupted delta transaction files.

  1. Disable the validation config. This allows you to bypass the IllegalStateException error when you run a query on the Delta table. This should not be used for production but is necessary to access the corrupted version of the table. Any running cluster can be used for this process. A cluster restart is not required to make changes in this property.
    %sql
    
    set spark.databricks.delta.stateReconstructionValidation.enabled=False;
  2. Identify which version of the table is corrupted. 
    1. Identify the latest Delta version of the table by checking the _delta_log folder.
      %fs
      
      ls /delta-table-root-folder/_delta_log
    2. Run select * from table@v(<latest>) in a SQL notebook cell. You should see empty results as the latest version is corrupted. Keep running select * from table@v(<latest-x>) until you find a version that is not corrupted.For the purpose of this article, assume that versions 6 and 7 are corrupted. Version 5 is intact and has not been corrupted.

  3. Make a backup of the Parquet files (data files) that were added to the table in version 6 and 7.

    This sample code moves the Parquet files to a backup folder.
    %python
    
    import os
    import shutil
    
    
    def get_parquet_files_list(corrupted_versions):
      '''
      Get list contains corrupted json versions, identify the parquet data files from these version and returns them in list
      '''
      final_list=[]
      for file in corrupted_versions:
        delta_table_location = '<delta-table-location>' # prefix /dbfs not needed
        json_path = delta_table_location+'/_delta_log/'+str(file)
        df = spark.read.json(<json_path>).select('add.path')
        target_list = list(df.select('path').toPandas()['path'])
        final_list.extend(target_list)
      return list(filter(None, final_list))
    
    
    def copy_files(final_list,source_folder,backup_folder):
      '''
      copy parquet files from source to backup folder
      '''
      
      i=1
      for file_path in final_list:
        src_path = source_folder + file_path
        trg_path = backup_folder + file_path
        os.makedirs(os.path.dirname(trg_path), exist_ok=True)
        shutil.copy(src_path, trg_path)
        print("Done --> "+str(i))
        i = i+1
        
    def main():    
      source_folder = '<delta-table-location>' # prefix /dbfs is needed if the table location is in dbfs folder or in mount point 
      backup_folder = '<backup-folder-storage-path>' # prefix /dbfs is needed
      corrupted_versions = ['00000000000000000006.json','00000000000000000007.json'] # enter the corrupted versions json files here
      final_list = get_parquet_files_list(corrupted_versions)
      copy_files(final_list,source_folder,backup_folder)
      
    if __name__ == "__main__":
        main()
  4. Remove the CRC and JSON files for the corrupted versions of the table.
    %fs
    
    rm /delta-table-root-folder/_delta_log/00000000000000000006.json
    rm /delta-table-root-folder/_delta_log/00000000000000000006.crc
    %fs
    
    rm /delta-table-root-folder/_delta_log/00000000000000000007.json
    rm /delta-table-root-folder/_delta_log/00000000000000000007.crc
  5. Run RESTORE TABLE to restore the Delta table to the most recent version that is not corrupted. In our example, this is version 5.
    %sql
    
    RESTORE TABLE <table-name> TO VERSION AS OF 5
  6. Now that the corrupted files have been removed, the table can be queried on any version. To avoid data loss, you must append the Parquet files that you previously backed up. This ensures that any data added to the corrupted versions of the table are inserted into the restored version and unnecessary data loss can be avoided.

    While appending, please verify:
    1. If the target table is partitioned. If it is partitioned, include the partitionBy option in the append statement.
    2. Confirm all data types match the target table.
      %python
      
      backup_folder = '/backup-folder'
      target_table_path = '/delta-table-root-folder/'
      
      append_df = spark.read.format('parquet').load(<backup-folder>)
      
      append_df_new = append_df.withColumn('col1',col('col1').cast('string'))# casting to match the target table schema
      
      append_df_new.write.format('delta').partitionBy('col2','col3','col4').mode('append').option("path",<target-table-path>).saveAsTable("db.tableName")
  7. Re-enable validation checking.
    %sql
    
    set spark.databricks.delta.stateReconstructionValidation.enabled=True;