Column value errors when connecting from Apache Spark to Databricks using Spark JDBC

Use overriding quote identifiers in the JdbcDialect class and register them under JDBCDialects in Java or Python.

Written by swetha.nandajan

Last published at: October 22nd, 2024

Problem

When connecting Apache Spark to Databricks using Spark JDBC to read data from tables, you observe that column names are returned when you expect actual column values.

Alternatively, you may receive an SQL exception where the column name is returned (string) when you expect an integer (actual column data type).

Example of the SQL Exception 

[Databricks][JDBC](10140) Error converting value to int.
at com.databricks.client.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.databricks.client.utilities.conversion.TypeConverter.toInt(Unknown Source)
at com.databricks.client.jdbc.common.SForwardResultSet.getInt(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7(JdbcUtils.scala:412)

 

Cause

Apache Spark converts the query SELECT columnName FROM tableName into SELECT "columnName" FROM (SELECT columnName FROM tableName). This causes all returned data from the SQL to be hard-coded column names. 

 

Solution

For versions under 4.0.0, override the quoteIdentifier function in the JdbcDialect class and register it under JDBCDialects

 

Java 

Define a class that extends JdbcDialect and then registers it before using spark.read.jdbc to read the table. 

package com;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.jdbc.JdbcDialect;
public class dbsqlDialectClass extends JdbcDialect {
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:databricks");
}
@Override
public String quoteIdentifier(String colName) {
return "`"+colName+"`";
}
}

 

Register the dialect before using spark.read.jdbc to read the table, after the Spark session has been created.

dbsqlDialectClass dbsqlDialect = new dbsqlDialectClass();
JdbcDialects.registerDialect(dbsqlDialect);
String url = "jdbc:databricks://<your-domain>:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/<warehouse-id>;UseNativeQuery=1;UID=token;connCatalog=hive_metastore;PWD="+<access_token>;
String brie_pushdown_query1 = "select `<column-name>` from <table-name>";
Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.databricks.client.jdbc.Driver")
.option("query", brie_pushdown_query1)
.option("Auth_AccessToken", <access_token>)
.load();

 

Python 

  1. Use the first part of the code to build a side jar.
  2. Add the side jar to the class path so that the Java Virtual Machine (JVM) loads it when coming up.
  3. Use the py4j module to import the class into the below code snippet.

 

from py4j.java_gateway import java_import
gw = spark.sparkContext._gateway
java_import(gw.jvm, "dbsqlDialectClass")
gw.jvm.org.apache.spark.sql.jdbc.JdbcDialects.registerDialect(
gw.jvm.dbsqlDialectClass())
url = "jdbc:databricks://<your-domain>:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/<warehouse-id>;UID=token;PWD=" + <access_token>
brie_pushdown_query = "("select `<column-name>` from <table-name>") as query"
loadedDF = spark.read .format("jdbc") .option("url", url).option("driver", "com.databricks.client.jdbc.Driver").option("dbtable", brie_pushdown_query).load()
display(loadedDF)

 

Note

The issue has been fixed in Spark version 4.0.0. However 4.0.0 is still in preview, so not used for production loads as of this article’s publish date.