Problem
In Spark SQL, you use the operation dropDuplicates()
with variant type fields. The following code shows a usage example.
# adding a variant type field to a dataframe
df = df.withColumn("<field-name>", parse_json(col(‘<field-with-json-as-text>’)))
# dropping duplicates
df = df.dropDuplicates()
You then receive the following error.
The feature is not supported: Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column `<field-name>` is "VARIANT".
You may also see this kind of MatchError
in your logs.
25/03/03 00:31:55 ERROR Executor: Exception in task 2.0 in stage 51.0 (TID 201)
scala.MatchError: {“x”:11,”y”:22} (of class org.apache.spark.unsafe.types.VariantVal)
at org.apache.spark.sql.catalyst.expressions.InterpretedHashFunction.hash(hash.scala:559)
at org.apache.spark.sql.catalyst.expressions.Murmur3Hash.computeHash(hash.scala:658)
at org.apache.spark.sql.catalyst.expressions.Murmur3Hash.computeHash(hash.scala:648)
at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:316)
at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1117)
Cause
Executing comparing operations like dropDuplicates()
– or INTERSECT
and EXCEPT
– are not supported on variant type fields.
Solution
There are three options.
1. Execute the dropDuplicates()
operation before converting to a variant type. (As an added benefit, removing duplicates before parsing may also reduce runtime.)
df = df.dropDuplicates()
df = df.withColumn("<field-name>", parse_json(
col(‘<field-with-json-as-text>’)))
2. Instead of using VariantType
in a column, use a data type that is compatible with dropDuplicates()
, such as Apache Spark's built-in functions to parse the semi-structured data into a StructType
or MapType
. The following code provides an example using MapType
.
#creating schema
json_map_schema = MapType(StringType(), StringType())
#adding the MapType field
df = df.withColumn("<field-name>", from_json(col("<field-with-json-as-text>"), json_map_schema))
df = df.dropDuplicates()
3. Implement a custom deduplication logic using window functions. For example, you can assign row numbers to partitions of data based on certain columns, and then filter out duplicates by selecting the first occurrence in each partition.
df = df.withColumn("<field-name>", parse_json(
col(‘<field-with-json-as-text>’)))
# using window function to drop duplicates
df = df.withColumn("row_num", row_number().over(window_spec))
df = df.filter(col("row_num") == 1).drop("row_num")
For more information, review the VARIANT
type (AWS | Azure | GCP) documentation.