Append output is not supported without a watermark
Problem You are performing an aggregation using append mode and an exception error message is returned. Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark Cause You cannot use append mode on an aggregated DataFrame without a watermark. This is by design. Solution You must apply a...
Apache Spark DStream is not supported
Problem You are attempting to use a Spark Discretized Stream (DStream) in a Databricks streaming job, but the job is failing. Cause DStreams and the DStream API are not supported by Databricks. Solution Instead of using Spark DStream, you should migrate to Structured Streaming. Review the Databricks Structured Streaming in production (AWS | Azure | ...
Streaming with File Sink: Problems with recovery if you change checkpoint or output directories
When you stream data into a file sink, you should always change both checkpoint and output directories together. Otherwise, you can get failures or unexpected outputs. Apache Spark creates a folder inside the output directory named _spark_metadata. This folder contains write-ahead logs for every batch run. This is how Spark gets exactly-once guarant...
Get the path of files consumed by Auto Loader
When you process streaming files with Auto Loader (AWS | Azure | GCP), events are logged based on the files created in the underlying storage. This article shows you how to add the file path for every filename to a new column in the output DataFrame. One use case for this is auditing. When files are ingested to a partitioned folder structure there i...
How to restart a structured streaming query from last written offset
Scenario You have a stream, running a windowed aggregation query, that reads from Apache Kafka and writes files in Append mode. You want to upgrade the application and restart the query with the offset equal to the last written offset. You want to discard all state information that hasn’t been written to the sink, start processing from the earliest ...
Kafka error: No resolvable bootstrap urls
Problem You are trying to read or write data to a Kafka stream when you get an error message. kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer Caused by: kafkashaded.org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers If you are running a notebook, the error me...
readStream() is not whitelisted error when running a query
Problem You have table access control (AWS | Azure | GCP) enabled on your cluster. You are trying to run a structured streaming query and get and error message. py4j.security.Py4JSecurityException: Method public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.SQLContext.readStream() is not whitelisted on class class org.apache.s...
Checkpoint files not being deleted when using display()
Problem You have a streaming job using display() to display DataFrames. %scala val streamingDF = spark.readStream.schema(schema).parquet(<input_path>) display(streamingDF) Checkpoint files are being created, but are not being deleted. You can verify the problem by navigating to the root directory and looking in the /local_disk0/tmp/ folder. Ch...
Checkpoint files not being deleted when using foreachBatch()
Problem You have a streaming job using foreachBatch() to process DataFrames. %scala streamingDF.writeStream.outputMode("append").foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.write.format("parquet").mode("overwrite").save(output_directory) }.start() Checkpoint files are being created, but are not being deleted. You can verify th...
Conflicting directory structures error
Problem You have an Apache Spark job that is failing with a Java assertion error java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Example stack trace Caused by: org.apache.spark.sql.streaming.StreamingQueryException: There was an error when trying to infer the partition schema of the current batch of files. Plea...
RocksDB fails to acquire a lock
Problem You are trying to use RocksDB as a state store for your structured streaming application, when you get an error message saying that the instance could not be acquired. Caused by: java.lang.IllegalStateException: RocksDB instance could not be acquired by [ThreadId: 742, task: 140.3 in stage 3152, TID 553193] as it was not released by [ThreadI...
Streaming job gets stuck writing to checkpoint
Problem You are monitoring a streaming job, and notice that it appears to get stuck when processing data. When you review the logs, you discover the job gets stuck when writing data to a checkpoint. INFO HDFSBackedStateStoreProvider: Deleted files older than 381160 for HDFSStateStoreProvider[id = (op=0,part=89),dir = dbfs:/FileStore/R_CHECKPOINT5/st...