Problem
Sometimes when you read a Redshift table:
%scala val original_df = spark.read. format("com.databricks.spark.redshift"). option("url", url). option("user", user). option("password", password). option("query", query). option("forward_spark_s3_credentials", true). option("tempdir", "path"). load()
The Spark job will throw a NullPointerException:
Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194)
Cause
The problem comes from the way Spark reads data from Redshift. The Amazon Redshift data source uses Redshift’s unload format to read data from Redshift: Spark first issues an unload command to Redshift to make it dump the contents of the table in the unload format to temporary files, and then Spark scans those temporary files. This text-based unload format does not differentiate between an empty string and a null string by default – both are encoded as an empty string in the resulting file. When spark-redshift reads the data in the unload format, there’s not enough information for it to tell whether the input was an empty string or a null, and currently it simply deems it’s a null.
Solution
In Scala, set the nullable to true for all the String columns:
%scala import org.apache.spark.sql.types.{StructField, StructType, StringType} import org.apache.spark.sql.{DataFrame, SQLContext} def setNullableStateForAllStringColumns(df: DataFrame, nullable: Boolean) = { StructType(df.schema.map { case StructField( c, StringType, _, m) => StructField( c, StringType, nullable = nullable, m) case StructField( c, t, n, m) => StructField( c, t, n, m) }) }
In Python:
%python def set_nullable_for_all_string_columns(df, nullable): from pyspark.sql.types import StructType, StructField, StringType new_schema = StructType([StructField(f.name, f.dataType, nullable, f.metadata) if (isinstance(f.dataType, StringType)) else StructField(f.name, f.dataType, f.nullable, f.metadata) for f in df.schema.fields]) return new_schema
To use this function, get the schema of original_df, then modify the schema to make all String columns to nullable, then re-read from Redshift:
%scala val df = spark.read.schema(setNullableStateForAllStringColumns(original_df, true)). format("com.databricks.spark.redshift"). option("url", url). option("user", user). option("password", password). option("query", query). option("forward_spark_s3_credentials", true). option("tempdir", "path"). load()