Problem
Delta Live Tables (DLT) provides an APPLY CHANGES
API that helps simplify change data capture (CDC) use cases. When you try to reference a field with a qualifier (this may also be referred to as a 'nested field') using DLT APPLY CHANGES
API arguments such as keys
or sequence_by
, you encounter an AnalysisException
.
Example
dlt.apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["<key1>.<key2>"],
...
)
AnalysisException: The Column value must be a column identifier without any qualifier.
Cause
Using a qualifier such as key1.key2
is not supported with required fields such as keys
and sequence_by
.
Solution
Add a view with @dlt.view
to extract the desired columns using a Spark API such as select
, or withColumn[s]
prior to referencing them in APPLY CHANGES
.
Example
In this simplified example, dlt_source
is conceptually like the Bronze layer. Next, dlt_view
is a logical layer on top of Bronze to help facilitate further processing with the APPLY CHANGES
API. Last, dlt_target
is the Silver layer of the medallion architecture.
dlt.create_streaming_table("dlt_target")
@dlt.view
def dlt_source_view():
return (
spark.readStream
.format("delta")
.table("dlt_source")
.withColumns(
{"<key2>": "<key1>.<key2>"}
)
)
dlt.apply_changes(
target = "dlt_target",
source = "dlt_source_view",
keys = ["<key2>"],
sequence_by = "col1",
stored_as_scd_type = 1
)
For more information on DLT views, refer to the DLT Python language reference (AWS | Azure | GCP) documentation.