Accessing Redshift Fails with NullPointerException

Problem

Sometimes when you read a Redshift table:

val original_df = spark.read.
      format("com.databricks.spark.redshift").
      option("url", url).
      option("user", user).
      option("password", password).
      option("query", query).
      option("forward_spark_s3_credentials", true).
      option("tempdir", "path").
      load()

The Spark job will throw a NullPointerException:

Caused by: java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194)

Cause

The problem comes from the way Spark reads data from Redshift. The Amazon Redshift data source uses Redshift’s unload format to read data from Redshift: Spark first issues an unload command to Redshift to make it dump the contents of the table in the unload format to temporary files, and then Spark scans those temporary files. This text-based unload format does not differentiate between an empty string and a null string by default – both are encoded as an empty string in the resulting file. When spark-redshift reads the data in the unload format, there’s not enough information for it to tell whether the input was an empty string or a null, and currently it simply deems it’s a null.

Solution

In Scala, set the nullable to true for all the String columns:

%scala
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.{DataFrame, SQLContext}

def setNullableStateForAllStringColumns(df: DataFrame, nullable: Boolean) = {
  StructType(df.schema.map {
    case StructField( c, StringType, _, m) => StructField( c, StringType, nullable = nullable, m)
    case StructField( c, t, n, m) => StructField( c, t, n, m)
  })
}

In Python:

%python
def set_nullable_for_all_string_columns(df, nullable):
    from pyspark.sql.types import StructType, StructField, StringType
    new_schema = StructType([StructField(f.name, f.dataType, nullable, f.metadata)
                             if (isinstance(f.dataType, StringType))
                             else StructField(f.name, f.dataType, f.nullable, f.metadata)
                             for f in df.schema.fields])
    return new_schema

To use this function, get the schema of original_df, then modify the schema to make all String columns to nullable, then re-read from Redshift:

%scala
val df = spark.read.schema(setNullableStateForAllStringColumns(original_df, true)).
      format("com.databricks.spark.redshift").
      option("url", url).
      option("user", user).
      option("password", password).
      option("query", query).
      option("forward_spark_s3_credentials", true).
      option("tempdir", "path").
      load()