Introduction
After executing a VACUUM
operation on a Delta table, you want to automate the extraction and logging of relevant metrics, such as the number of files deleted and the total size of data removed.
Instructions
Parse the operationMetrics
from the Delta transaction log using DESCRIBE HISTORY
, and store this information in a dedicated Delta audit table for auditing or optimization validation purposes.
Follow these steps to automate the logging of VACUUM
operations performed in Delta tables.
- Create a Delta audit table to store
VACUUM
metrics likenumFilesToDelete
,sizeOfDataToDelete
, andnumDeletedFiles
. - Run the
VACUUM
command on the target Delta table. - Query
DESC HISTORY
forVACUUM
operations (VACUUM START
,VACUUM END
). - Track the maximum processed Delta version from the audit log.
- Filter new
VACUUM
entries usingversion > max_logged_version
. - Append only new rows to the audit table to avoid duplication.
Sample implementation snippet
The following code creates an audit table, performs the VACUUM operation on required tables, gets the max processed version, retrieves only the new history rows after the last logged version, selects desired audit fields, and writes the fields to the audit table.
# Create audit table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.<your-vacuum-audit-log> (
table_name STRING,
event_time TIMESTAMP,
operation STRING,
userName STRING,
version BIGINT,
status STRING,
numFilesToDelete STRING,
sizeOfDataToDelete STRING,
numDeletedFiles STRING
) USING DELTA;
""")
# Perform the VACUUM operations on the required tables
spark.sql(f"VACUUM <catalog>.<schema>.<your-table-to-be-vacuumed>")
# Get max processed version
latest_logged_version = spark.sql(f"""
SELECT COALESCE(MAX(version), -1) as max_version
FROM <catalog>.<schema>.<your-vacuum-audit-log>
WHERE table_name = '<catalog>.<schema>.<your-table-to-be-vacuumed>'
""").collect()[0]["max_version"]
# Get only new history rows after last logged version
history_df = spark.sql(f"""
SELECT
timestamp AS event_time,
operation,
operationParameters,
operationMetrics,
userName,
version
FROM (DESC HISTORY `<catalog>.<schema>.<your-table-to-be-vacuumed>`)
WHERE operation IN ('VACUUM START', 'VACUUM END')
AND version > {latest_logged_version}
""")
# Select desired audit fields
metrics_df = history_df.select(
lit("<catalog>.<schema>.<your-table-to-be-vacuumed>").alias("table_name"),
col("event_time"),
col("operation"),
col("userName"),
col("version"),
col("operationParameters")["status"].alias("status"),
col("operationMetrics")["numFilesToDelete"].alias("numFilesToDelete"),
col("operationMetrics")["sizeOfDataToDelete"].alias("sizeOfDataToDelete"),
col("operationMetrics")["numDeletedFiles"].alias("numDeletedFiles")
)
# Write to audit table
metrics_df.write.mode("append").format("delta").saveAsTable("<catalog>.<schema>.<your-vacuum-audit-log>")