Problem
Sometimes Apache Spark jobs hang indefinitely due to the non-deterministic behavior of a Spark User-Defined Function (UDF). Here is an example of such a function:
%scala val convertorUDF = (commentCol: String) => { #UDF definition } val translateColumn = udf(convertorUDF)
If you call this UDF using the withColumn() API and then apply some filter transformation on the resulting DataFrame, the UDF could potentially execute multiple times for each record, affecting application performance.
%scala val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate"))) val filteredDF = translatedDF.filter(!translatedDF("translatedColumn").contains("Invalid URL Provided")) && !translatedDF("translatedColumn").contains("Unable to connect to Microsoft API"))
Cause
Sometimes a deterministic UDF can behave nondeterministically, performing duplicate invocations depending on the definition of the UDF. You often see this behavior when you use a UDF on a DataFrame to add an additional column using the withColumn() API, and then apply a transformation (filter) to the resulting DataFrame.
Solution
UDFs must be deterministic. Due to optimization, duplicate invocations might be eliminated or the function can be invoked more times than it is present in the query.
The better option is to cache the DataFrame where you are using the UDF. If the DataFrame contains a large amount of data, then writing it to a Parquet format file is optimal.
You can use the following code to cache the result:
%scala val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate"))).cache()