Problem
When trying to perform a count on corrupt records after passing a custom schema while reading a file, you encounter an issue where the count always returns as zero even if there are valid records in the DataFrame.
Example
df = spark.read.schema(<schema>).option("columnNameOfCorruptRecord","_corrupt_record").csv(<filename>)
corrupt_records = df.filter("_corrupt_record is not null")
corrupt_records.count()
Where count()
gives a result of 0
even though there are records in the corrupt_records
data frame and multiple CSV files.
Cause
When trying to filter records based on the corrupt record, column pruning optimization removes all other columns from the Apache Spark plan. Spark doesn’t read any data from the source, resulting in a zero record count.
Solution
Collect records in an array and get the count of the array instead.
Example
df = spark.read.schema(<schema>).option("columnNameOfCorruptRecord","_corrupt_record").csv(<filename>)
corrupt_records = df.filter("_corrupt_record is not null")
len(corrupt_records.collect())