Append to a DataFrame
To append to a DataFrame, use the union method. %scala val firstDF = spark.range(3).toDF("myCol") val newRow = Seq(20) val appended = firstDF.union(newRow.toDF()) display(appended) %python firstDF = spark.range(3).toDF("myCol") newRow = spark.createDataFrame([[20]]) appended = firstDF.union(newRow) display(appended)...
How to improve performance with bucketing
Bucketing is an optimization technique in Apache Spark SQL. Data is allocated among a specified number of buckets, according to values derived from one or more bucketing columns. Bucketing improves performance by shuffling and sorting data prior to downstream operations such as table joins. The tradeoff is the initial overhead due to shuffling and s...
How to handle blob data contained in an XML file
If you log events in XML format, then every XML event is recorded as a base64 string. In order to run analytics on this data using Apache Spark, you need to use the spark_xml library and the BASE64DECODER API to transform the data for analysis. Problem You need to analyze base64-encoded strings from an XML-formatted log file using Spark. For example...
Simplify chained transformations
Sometimes you may need to perform multiple transformations on your DataFrame: %scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame val testDf = (1 to 10).toDF("col") def func0(x: Int => Int, y: Int)(in: DataFrame): DataFrame = { in.filter('col > x(y)) } def func1(x: Int)(in: DataFrame): DataFrame = { in.sele...
How to dump tables in CSV, JSON, XML, text, or HTML format
You want to send results of your computations in Databricks outside Databricks. You can use BI tools to connect to your cluster via JDBC and export results from the BI tools, or save your tables in DBFS or blob storage and copy the data via REST API. This article introduces JSpark, a simple console tool for executing SQL queries using JDBC on Spark ...
Get and set Apache Spark configuration properties in a notebook
In most cases, you set the Spark config (AWS | Azure ) at the cluster level. However, there may be instances when you need to check (or set) the values of specific Spark configuration properties in a notebook. This article shows you how to display the current value of a Spark configuration property in a notebook. It also shows you how to set a new v...
Hive UDFs
This article shows how to create a Hive UDF, register it in Spark, and use it in a Spark SQL query. Here is a Hive UDF that takes a long as an argument and returns its hexadecimal representation. %scala import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.io.LongWritable // This UDF takes a long integer and converts it to a hexadecimal...
Prevent duplicated columns when joining two DataFrames
If you perform a join in Spark and don’t specify your join correctly you’ll end up with duplicate column names. This makes it harder to select those columns. This article and notebook demonstrate how to perform a join so that you don’t have duplicated columns. Join on columns If you join on columns, you get duplicated columns. Scala %scala val llist...
Revoke all user privileges
When user permissions are explicitly granted for individual tables and views, the selected user can access those tables and views even if they don’t have permission to access the underlying database. If you want to revoke a user’s access, you can do so with the REVOKE command. However, the REVOKE command is explicit, and is strictly scoped to the ob...
How to list and delete files faster in Databricks
Scenario Suppose you need to delete a table that is partitioned by year, month, date, region, and service. However, the table is huge, and there will be around 1000 part files per partition. You can list all the files in each partition and then delete them using an Apache Spark job. For example, suppose you have a table that is partitioned by a, b, ...
How to handle corrupted Parquet files with different schema
Problem Let’s say you have a large list of essentially independent Parquet files, with a variety of different schemas. You want to read only those files that match a specific schema and skip the files that don’t match. One solution could be to read the files in sequence, identify the schema, and union the DataFrames together. However, this approach ...
No USAGE permission on database
Problem You are using a cluster running Databricks Runtime 7.3 LTS and above. You have enabled table access control for your workspace (AWS | Azure | GCP) as the admin user, and granted the SELECT privilege to a standard user-group that needs to access the tables. A user tries to access an object in the database and gets a SecurityException error me...
Nulls and empty strings in a partitioned column save as nulls
Problem If you save data containing both empty strings and null values in a column on which the table is partitioned, both values become null after writing and reading the table. To illustrate this, create a simple DataFrame: %scala import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.encoders.RowEncoder val data = Seq(Row(1, "")...
Behavior of the randomSplit method
When using randomSplit on a DataFrame, you could potentially observe inconsistent behavior. Here is an example: %python df = spark.read.format('inconsistent_data_source').load() a,b = df.randomSplit([0.5, 0.5]) a.join(broadcast(b), on='id', how='inner').count() Typically this query returns 0. However, depending on the underlying data source or input...
Job fails when using Spark-Avro to write decimal values to AWS Redshift
Problem In Databricks Runtime versions 5.x and above, when writing decimals to Amazon Redshift using Spark-Avro as the default temp file format, either the write operation fails with the exception: Error (code 1207) while loading data into Redshift: "Invalid digit, Value '"', Pos 0, Type: Decimal" or the write operation writes nulls in place of the ...
Generate schema from case class
Spark provides an easy way to generate a schema from a Scala case class. For case class A, use the method ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]. For example: %scala import org.apache.spark.sql.types.StructType import org.apache.spark.sql.catalyst.ScalaReflection case class A(key: String, time: java.sql.Timestamp, date: java....
How to specify skew hints in dataset and DataFrame-based join commands
When you perform a join command with DataFrame or Dataset objects, if you find that the query is stuck on finishing a small number of tasks due to data skew, you can specify the skew hint with the hint("skew") method: df.hint("skew"). The skew join optimization (AWS | Azure | GCP) is performed on the DataFrame for which you specify the skew hint. In...
How to update nested columns
Spark doesn’t support adding new columns or dropping existing columns in nested structures. In particular, the withColumn and drop methods of the Dataset class don’t allow you to specify a column name different from any top level columns. For example, suppose you have a dataset with the following schema: %scala val schema = (new StructType) .a...
Incompatible schema in some files
Problem The Spark job fails with an exception like the following while reading Parquet files: Error in SQL statement: SparkException: Job aborted due to stage failure: Task 20 in stage 11227.0 failed 4 times, most recent failure: Lost task 20.3 in stage 11227.0 (TID 868031, 10.111.245.219, executor 31): java.lang.UnsupportedOperationException: org.a...
Access denied when writing to an S3 bucket using RDD
Problem Writing to an S3 bucket using RDDs fails. The driver node can write, but the worker (executor) node returns an access denied error. Writing with the DataFrame API, however works fine. For example, let’s say you run the following code: %scala import java.io.File import java.io.Serializable import org.apache.spark.{SparkConf, SparkContext} imp...
Invalid timestamp when loading data into Amazon Redshift
Problem When you use a spark-redshift write operation to save timestamp data to Amazon Redshift, the following error can occur if that timestamp data includes timezone information. Error (code 1206) while loading data into Redshift: "Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SSOF]" Cause The Redshift table is using the Timestamp data typ...
Unable to infer schema for ORC error
Problem You are trying to read ORC files from a directory when you get an error message: org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually. Cause An Unable to infer the schema for ORC error occurs when the schema is not defined and Apache Spark cannot infer the schema due to: An empty directory. Us...
Access files written by Apache Spark on ADLS Gen1
Problem You are using Azure Databricks and have a Spark job that is writing to ADLS Gen1 storage. When you try to manually read, write, or delete data in the folders you get an error message. Forbidden. ACL verification failed. Either the resource does not exist or the user is not authorized to perform the requested operation Cause When writing data...
Object ownership is getting changed on dropping and recreating tables
Problem Ownership of SQL objects changes after dropping and recreating them. This can result in job failures due to permission issues. Cause In Databricks Runtime 7.3 LTS, when jobs are run with table ACLs turned off, any action that drops and recreates tables or views preserves the table ACLs that was set the last time the job was run with table AC...
User does not have permission SELECT on ANY File
Problem You are trying to create an external hive table, but keep getting a User does not have permission SELECT on any file error message. java.lang.SecurityException: User does not have permission SELECT on any file. Table access control (AWS | Azure | GCP) is enabled your cluster and you are not an admin. Cause The Databricks SQL query analyzer e...