Problem
When running a structured streaming job that reads data from a change data capture (CDC) table and joins it with another streaming table, you notice the job initially runs successfully for a few batches but then fails with the following error.
[STREAM_FAILED] Query [id = XXX, runId = XXX] terminated with exception: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 953) (100.XX.YY.112 executor 11): org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field. - Provided key schema: StructType(StructField(field0,StringType,true),StructField(index,LongType,true)) - Existing key schema: StructType(StructField(field0,StringType,true),StructField(index,LongType,true)) - Provided value schema: StructType(StructField(c_name_1,StringType,true),StructField(c_name_2,StringType,true),StructField(c_name_3,StringType,true),StructField(c_name_4,StringType,true),StructField(c_name_5,StringType,true),StructField(c_name_6,StringType,true),StructField(c_name_7,StringType,true)) - Existing value schema: StructType(StructField(c_name_1,StringType,false),StructField(c_name_2,StringType,true),StructField(c_name_3,StringType,true),StructField(c_name_4,StringType,true),StructField(c_name_5,StringType,true),StructField(c_name_6,StringType,true),StructField(c_name_7,StringType,true)) If you want to force running query without schema validation, please set spark.sql.streaming.stateStore.stateSchemaCheck to false. Please note running query with incompatible schema could cause indeterministic behavior.
Cause
The provided schema does not match the existing schema for the state. This issue can occur due to schema of the data being read from the CDC table changing over time, causing a schema mismatch with the existing state.
Specifically, there is a change in the nullability of the "c_name_1"
field. It was once `StructField(c_name_1,StringType,false)`
and changed to `StructField(c_name_1,StringType,true)`
.
Solution
Set an Apache Spark configuration to avoid the schema mismatch issues caused by the change in nullable value. When true, the state schema checker does not check the nullability of columns.
- Navigate to your cluster and open the settings.
- Click Advanced options.
- Under the Spark tab, in the Spark config box, enter the following code.
spark.databricks.streaming.stateStore.stateSchemaCheck.ignoreNullCompatibility true
Alternatively, add the previous code in your notebook using the spark.conf.set()
command.
Note
If the input schema previously marked a column as nullable but later changes the column to non-nullable, stateful operators may read older rows from the state store where the column's value is null. This may result in downstream output containing null values for a column that is expected to be non-nullable.