Problem
When connecting Apache Spark to Databricks using Spark JDBC to read data from tables, you observe that column names are returned when you expect actual column values.
Alternatively, you may receive an SQL exception where the column name is returned (string) when you expect an integer (actual column data type).
Example of the SQL Exception
[Databricks][JDBC](10140) Error converting value to int.
at com.databricks.client.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.databricks.client.utilities.conversion.TypeConverter.toInt(Unknown Source)
at com.databricks.client.jdbc.common.SForwardResultSet.getInt(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7(JdbcUtils.scala:412)
Cause
Apache Spark converts the query SELECT columnName FROM tableName
into SELECT "columnName" FROM (SELECT columnName FROM tableName)
. This causes all returned data from the SQL to be hard-coded column names.
Solution
For versions under 4.0.0, override the quoteIdentifier
function in the JdbcDialect
class and register it under JDBCDialects
.
Java
Define a class that extends JdbcDialect
and then registers it before using spark.read.jdbc
to read the table.
package com;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.jdbc.JdbcDialect;
public class dbsqlDialectClass extends JdbcDialect {
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:databricks");
}
@Override
public String quoteIdentifier(String colName) {
return "`"+colName+"`";
}
}
Register the dialect before using spark.read.jdbc
to read the table, after the Spark session has been created.
dbsqlDialectClass dbsqlDialect = new dbsqlDialectClass();
JdbcDialects.registerDialect(dbsqlDialect);
String url = "jdbc:databricks://<your-domain>:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/<warehouse-id>;UseNativeQuery=1;UID=token;connCatalog=hive_metastore;PWD="+<access_token>;
String brie_pushdown_query1 = "select `<column-name>` from <table-name>";
Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.databricks.client.jdbc.Driver")
.option("query", brie_pushdown_query1)
.option("Auth_AccessToken", <access_token>)
.load();
Python
- Use the first part of the code to build a side jar.
- Add the side jar to the class path so that the Java Virtual Machine (JVM) loads it when coming up.
- Use the
py4j
module to import the class into the below code snippet.
from py4j.java_gateway import java_import
gw = spark.sparkContext._gateway
java_import(gw.jvm, "dbsqlDialectClass")
gw.jvm.org.apache.spark.sql.jdbc.JdbcDialects.registerDialect(
gw.jvm.dbsqlDialectClass())
url = "jdbc:databricks://<your-domain>:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/<warehouse-id>;UID=token;PWD=" + <access_token>
brie_pushdown_query = "("select `<column-name>` from <table-name>") as query"
loadedDF = spark.read .format("jdbc") .option("url", url).option("driver", "com.databricks.client.jdbc.Driver").option("dbtable", brie_pushdown_query).load()
display(loadedDF)
Note
The issue has been fixed in Spark version 4.0.0. However 4.0.0 is still in preview, so not used for production loads as of this article’s publish date.