Append output is not supported without a watermark
Problem You are performing an aggregation using append mode and an exception error message is returned. Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark Cause You cannot use append mode on an aggregated DataFrame without a watermark. This is by design. Solution You must apply a...
Apache Spark DStream is not supported
Problem You are attempting to use a Spark Discretized Stream (DStream) in a Databricks streaming job, but the job is failing. Cause DStreams and the DStream API are not supported by Databricks. Solution Instead of using Spark DStream, you should migrate to Structured Streaming. Review the Databricks Structured Streaming in production (AWS | Azure | ...
Streaming with File Sink: Problems with recovery if you change checkpoint or output directories
When you stream data into a file sink, you should always change both checkpoint and output directories together. Otherwise, you can get failures or unexpected outputs. Apache Spark creates a folder inside the output directory named _spark_metadata. This folder contains write-ahead logs for every batch run. This is how Spark gets exactly-once guarant...
Get the path of files consumed by Auto Loader
When you process streaming files with Auto Loader (AWS | Azure | GCP), events are logged based on the files created in the underlying storage. This article shows you how to add the file path for every filename to a new column in the output DataFrame. One use case for this is auditing. When files are ingested to a partitioned folder structure there i...
How to set up Apache Kafka on Databricks
This article explains how to set up Apache Kafka on AWS EC2 machines and connect them with Databricks. Following are the high level steps that are required to create a Kafka cluster and connect from Databricks notebooks. Step 1: Create a new VPC in AWS When creating the new VPC, set the new VPC CIDR range different than the Databricks VPC CIDR range...
Handling partition column values while using an SQS queue as a streaming source
Problem If data in S3 is stored by partition, the partition column values are used to name folders in the source directory structure. However, if you use an SQS queue as a streaming source, the S3-SQS source cannot detect the partition column values. For example, if you save the following DataFrame to S3 in JSON format: %scala val df = spark.range(1...
How to restart a structured streaming query from last written offset
Scenario You have a stream, running a windowed aggregation query, that reads from Apache Kafka and writes files in Append mode. You want to upgrade the application and restart the query with the offset equal to the last written offset. You want to discard all state information that hasn’t been written to the sink, start processing from the earliest ...
How to switch a SNS streaming job to a new SQS queue
Problem You have a Structured Streaming job running via the S3-SQS connector. Suppose you want to recreate the source SQS, backed by SNS data, and you want to proceed with a new queue to be processed in the same job and in the same output directory. Solution Use the following procedure: Create new SQS queues and subscribe to s3-events (from SNS). At...
Kafka error: No resolvable bootstrap urls
Problem You are trying to read or write data to a Kafka stream when you get an error message. kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer Caused by: kafkashaded.org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers If you are running a notebook, the error me...
readStream() is not whitelisted error when running a query
Problem You have table access control (AWS | Azure | GCP) enabled on your cluster. You are trying to run a structured streaming query and get and error message. py4j.security.Py4JSecurityException: Method public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.SQLContext.readStream() is not whitelisted on class class org.apache.s...
Checkpoint files not being deleted when using display()
Problem You have a streaming job using display() to display DataFrames. %scala val streamingDF = spark.readStream.schema(schema).parquet(<input_path>) display(streamingDF) Checkpoint files are being created, but are not being deleted. You can verify the problem by navigating to the root directory and looking in the /local_disk0/tmp/ folder. Ch...
Checkpoint files not being deleted when using foreachBatch()
Problem You have a streaming job using foreachBatch() to process DataFrames. %scala streamingDF.writeStream.outputMode("append").foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.write.format("parquet").mode("overwrite").save(output_directory) }.start() Checkpoint files are being created, but are not being deleted. You can verify th...
Conflicting directory structures error
Problem You have an Apache Spark job that is failing with a Java assertion error java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Example stack trace Caused by: org.apache.spark.sql.streaming.StreamingQueryException: There was an error when trying to infer the partition schema of the current batch of files. Plea...
RocksDB fails to acquire a lock
Problem You are trying to use RocksDB as a state store for your structured streaming application, when you get an error message saying that the instance could not be acquired. Caused by: java.lang.IllegalStateException: RocksDB instance could not be acquired by [ThreadId: 742, task: 140.3 in stage 3152, TID 553193] as it was not released by [ThreadI...
Stream XML files using an auto-loader
Apache Spark does not include a streaming API for XML files. However, you can combine the auto-loader features of the Spark batch API with the OSS library, Spark-XML, to stream XML files. In this article, we present a Scala based solution that parses XML data using an auto-loader. Install Spark-XML library You must install the Spark-XML OSS library ...
Streaming job using Kinesis connector fails
Problem You have a streaming job writing to a Kinesis sink, and it is failing with out of memory error messages. java.lang.OutOfMemoryError: GC Overhead limit exceeded java.lang.OutOfMemoryError: Java heap space. Symptoms include: Ganglia shows a gradual increase in JVM memory usage. Microbatch analysis shows input and processing rates are consisten...
Streaming job gets stuck writing to checkpoint
Problem You are monitoring a streaming job, and notice that it appears to get stuck when processing data. When you review the logs, you discover the job gets stuck when writing data to a checkpoint. INFO HDFSBackedStateStoreProvider: Deleted files older than 381160 for HDFSStateStoreProvider[id = (op=0,part=89),dir = dbfs:/FileStore/R_CHECKPOINT5/st...