Automate VACUUM metrics logging for Delta table cleanup audits

Parse the operationMetrics from the Delta transaction log using DESCRIBE HISTORY, and store this information in a dedicated Delta audit table.

Written by joel.robin

Last published at: June 4th, 2025

Introduction

After executing a VACUUM operation on a Delta table, you want to automate the extraction and logging of relevant metrics, such as the number of files deleted and the total size of data removed. 

 

Instructions

Parse the operationMetrics from the Delta transaction log using DESCRIBE HISTORY, and store this information in a dedicated Delta audit table for auditing or optimization validation purposes.

Follow these steps to automate the logging of VACUUM operations performed in Delta tables.

  1. Create a Delta audit table to store VACUUM metrics like numFilesToDeletesizeOfDataToDelete, and numDeletedFiles.
  2. Run the VACUUM command on the target Delta table.
  3. Query DESC HISTORY for VACUUM operations (VACUUM STARTVACUUM END).
  4. Track the maximum processed Delta version from the audit log.
  5. Filter new VACUUM entries using version > max_logged_version.
  6. Append only new rows to the audit table to avoid duplication.

 

Sample implementation snippet

The following code creates an audit table, performs the VACUUM operation on required tables, gets the max processed version, retrieves only the new history rows after the last logged version, selects desired audit fields, and writes the fields to the audit table.

# Create audit table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.<your-vacuum-audit-log> (
table_name STRING,
event_time TIMESTAMP,
operation STRING,
userName STRING,
version BIGINT,
status STRING,
numFilesToDelete STRING,
sizeOfDataToDelete STRING,
numDeletedFiles STRING
) USING DELTA;
""")



# Perform the VACUUM operations on the required tables
spark.sql(f"VACUUM <catalog>.<schema>.<your-table-to-be-vacuumed>")


# Get max processed version
latest_logged_version = spark.sql(f"""
  SELECT COALESCE(MAX(version), -1) as max_version
  FROM <catalog>.<schema>.<your-vacuum-audit-log>
  WHERE table_name = '<catalog>.<schema>.<your-table-to-be-vacuumed>'
""").collect()[0]["max_version"]

# Get only new history rows after last logged version
history_df = spark.sql(f"""
  SELECT 
    timestamp AS event_time,
    operation,
    operationParameters,
    operationMetrics,
    userName,
    version
  FROM (DESC HISTORY `<catalog>.<schema>.<your-table-to-be-vacuumed>`)
  WHERE operation IN ('VACUUM START', 'VACUUM END')
    AND version > {latest_logged_version}
""")

# Select desired audit fields
metrics_df = history_df.select(
    lit("<catalog>.<schema>.<your-table-to-be-vacuumed>").alias("table_name"),
    col("event_time"),
    col("operation"),
    col("userName"),
    col("version"),
    col("operationParameters")["status"].alias("status"),
    col("operationMetrics")["numFilesToDelete"].alias("numFilesToDelete"),
    col("operationMetrics")["sizeOfDataToDelete"].alias("sizeOfDataToDelete"),
    col("operationMetrics")["numDeletedFiles"].alias("numDeletedFiles")
)

# Write to audit table
metrics_df.write.mode("append").format("delta").saveAsTable("<catalog>.<schema>.<your-vacuum-audit-log>")