Accessing Redshift fails with NullPointerException
Problem
Sometimes when you read a Redshift table:
val original_df = spark.read.
format("com.databricks.spark.redshift").
option("url", url).
option("user", user).
option("password", password).
option("query", query).
option("forward_spark_s3_credentials", true).
option("tempdir", "path").
load()
The Spark job will throw a NullPointerException
:
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194)
Cause
The problem comes from the way Spark reads data from Redshift. The Amazon Redshift data source uses Redshift’s unload format to read data from Redshift: Spark first issues an unload
command to Redshift to make it dump the contents of the table in the unload format
to temporary files, and then Spark scans those temporary files. This text-based unload format
does not differentiate between an empty string and a null string by default – both are encoded as an empty string in the resulting file. When spark-redshift reads the data in the unload format, there’s not enough information for it to tell whether the input was an empty string or a null, and currently it simply deems it’s a null.
Solution
In Scala, set the nullable
to true
for all the String
columns:
%scala
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.{DataFrame, SQLContext}
def setNullableStateForAllStringColumns(df: DataFrame, nullable: Boolean) = {
StructType(df.schema.map {
case StructField( c, StringType, _, m) => StructField( c, StringType, nullable = nullable, m)
case StructField( c, t, n, m) => StructField( c, t, n, m)
})
}
In Python:
%python
def set_nullable_for_all_string_columns(df, nullable):
from pyspark.sql.types import StructType, StructField, StringType
new_schema = StructType([StructField(f.name, f.dataType, nullable, f.metadata)
if (isinstance(f.dataType, StringType))
else StructField(f.name, f.dataType, f.nullable, f.metadata)
for f in df.schema.fields])
return new_schema
To use this function, get the schema of original_df
, then modify the schema to make all String
columns to nullable
, then re-read from Redshift:
%scala
val df = spark.read.schema(setNullableStateForAllStringColumns(original_df, true)).
format("com.databricks.spark.redshift").
option("url", url).
option("user", user).
option("password", password).
option("query", query).
option("forward_spark_s3_credentials", true).
option("tempdir", "path").
load()