Apache Spark UI is not in sync with job
Problem The status of your Spark jobs is not correctly shown in the Spark UI (AWS | Azure | GCP). Some of the jobs that are confirmed to be in the Completed state are shown as Active/Running in the Spark UI. In some cases the Spark UI may appear blank. When you review the driver logs, you see an AsyncEventQueue warning. Logs ===== 20/12/23 21:20:26 ...
Apache Spark job fails with Parquet column cannot be converted error
Problem You are reading data in Parquet format and writing to a Delta table when you get a Parquet column cannot be converted error message. The cluster is running Databricks Runtime 7.3 LTS or above. org.apache.spark.SparkException: Task failed while writing rows. Caused by: com.databricks.sql.io.FileReadException: Error while reading file s3://buc...
Best practice for cache(), count(), and take()
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster’s workers. Since cache() is a transformation, the caching operation takes place only when a Spark action (for example, count(),...
Cannot import timestamp_millis or unix_millis
Problem You are trying to import timestamp_millis or unix_millis into a Scala notebook, but get an error message. %scala import org.apache.spark.sql.functions.{timestamp_millis, unix_millis} error: value timestamp_millis is not a member of object org.apache.spark.sql.functions import org.apache.spark.sql.functions.{timestamp_millis, unix_millis} Cau...
Cannot modify the value of an Apache Spark config
Problem You are trying to SET the value of a Spark config in a notebook and get a Cannot modify the value of a Spark config error. For example: %sql SET spark.serializer=org.apache.spark.serializer.KryoSerializer Error in SQL statement: AnalysisException: Cannot modify the value of a Spark config: spark.serializer; Cause The SET command does not wor...
Convert flattened DataFrame to nested JSON
This article explains how to convert a flattened DataFrame to a nested structure, by nesting a case class within another case class. You can use this technique to build a JSON file, that can then be sent to an external API. Define nested schema We’ll start with a flattened DataFrame. Using this example DataFrame, we define a custom nested schema usi...
Convert nested JSON to a flattened DataFrame
This article shows you how to flatten nested JSON, using only $"column.*" and explode methods. Sample JSON file Pass the sample JSON string to the reader. %scala val json =""" { "id": "0001", "type": "donut", "name": "Cake", "ppu": 0.55, "batters": { "batter": ...
Create a DataFrame from a JSON string or Python dictionary
In this article we are going to review how you can create an Apache Spark DataFrame from a variable containing a JSON string or a Python dictionary. Create a Spark DataFrame from a JSON string Add the JSON content from the variable to a list.%scala import scala.collection.mutable.ListBuffer val json_content1 = "{'json_col1': 'hello', 'json_col2': 32...
Decimal$DecimalIsFractional assertion error
Problem You are running a job on Databricks Runtime 7.x or above when you get a java.lang.AssertionError: assertion failed: Decimal$DecimalIsFractional error message. Example stack trace: java.lang.AssertionError: assertion failed: Decimal$DecimalIsFractional while compiling: <notebook> during phase: globalPhase=terminal, enteringPhase=j...
from_json returns null in Apache Spark 3.0
Problem The from_json function is used to parse a JSON string and return a struct of values. For example, if you have the JSON string [{"id":"001","name":"peter"}], you can pass it to from_json with a schema and get parsed struct values in return. %python from pyspark.sql.functions import col, from_json display( df.select(col('value'), from_json(c...
Intermittent NullPointerException when AQE is enabled
Problem You get an intermittent NullPointerException error when saving your data. Py4JJavaError: An error occurred while calling o2892.save. : java.lang.NullPointerException at org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin.$anonfun$getMapSizesForReduceId$1(OptimizeSkewedJoin.scala:167) at org.apache.spark.sql.execution.adaptive....
Manage the size of Delta tables
Delta tables are different than traditional tables. Delta tables include ACID transactions and time travel features, which means they maintain transaction logs and stale data files. These additional features require storage space. In this article we discuss recommendations that can help you manage the size of your Delta tables. Enable file system ve...
Trouble reading external JDBC tables after upgrading from Databricks Runtime 5.5
Problem Attempting to read external tables via JDBC works fine on Databricks Runtime 5.5, but the same table reads fail on Databricks Runtime 6.0 and above. You see an error similar to the following: com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisExc...
Select files using a pattern match
When selecting files, a common requirement is to only read specific files from a folder. For example, if you are processing logs, you may want to read files from a specific month. Instead of enumerating each file and folder to find the desired files, you can use a glob pattern to match multiple files with a single expression. This article uses examp...
Multiple Apache Spark JAR jobs fail when run concurrently
Problem If you run multiple Apache Spark JAR jobs concurrently, some of the runs might fail with the error: org.apache.spark.sql.AnalysisException: Table or view not found: xxxxxxx; line 1 pos 48 Cause This error occurs due to a bug in Scala. When an object extends App, its val fields are no longer immutable and they can be changed when the main met...
Write a DataFrame with missing columns to a Redshift table
Problem When writing to Redshift tables, if the target table has more columns than the source Apache Spark DataFrame you may get a copy error. The COPY failed with error: [Amazon][Amazon Redshift] (1203) Error occurred while trying to execute a query: ERROR: Load into table table-name failed. Check the 'stl_load_errors' system table for details. “12...
Job fails with ExecutorLostFailure due to “Out of memory” error
Problem Job fails with an ExecutorLostFailure error message. ExecutorLostFailure (executor <1> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <148564> ms Cause The ExecutorLostFailure error message means one of the executors in the Apache Spark cluster has been lost. This is a generic error message ...
Job fails with ExecutorLostFailure because executor is busy
Problem Job fails with an ExecutorLostFailure error message. ExecutorLostFailure (executor <1> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <148564> ms Cause The ExecutorLostFailure error message means one of the executors in the Apache Spark cluster has been lost. This is a generic error message ...
Understanding speculative execution
Speculative execution Speculative execution can be used to automatically re-attempt a task that is not making progress compared to other tasks in the same stage. This means if one or more tasks are running slower in a stage, they will be re-launched. The task that completes first is marked as successful. The other attempt gets killed. Implementatio...
Use custom classes and objects in a schema
Problem You are trying to create a dataset using a schema that contains Scala enumeration fields (classes and objects). When you run your code in a notebook cell, you get a ClassNotFoundException error. Sample code %scala object TestEnum extends Enumeration { type TestEnum = Value val E1, E2, E3 = Value } import spark.implicits._ import TestEnum._ c...