Simplify chained transformations

Learn how to simplify chained transformations on your DataFrame in Databricks.

Written by Adam Pavlacka

Last published at: May 25th, 2022

Sometimes you may need to perform multiple transformations on your DataFrame:

%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

val testDf = (1 to 10).toDF("col")

def func0(x: Int => Int, y: Int)(in: DataFrame): DataFrame = {
  in.filter('col > x(y))
}
def func1(x: Int)(in: DataFrame): DataFrame = {
  in.selectExpr("col", s"col + $x as col1")
}
def func2(add: Int)(in: DataFrame): DataFrame = {
  in.withColumn("col2", expr(s"col1 + $add"))
}

When you apply these transformations, you may end up with spaghetti code like this:

%scala

def inc(i: Int) = i + 1

val tmp0 = func0(inc, 3)(testDf)
val tmp1 = func1(1)(tmp0)
val tmp2 = func2(2)(tmp1)
val res = tmp2.withColumn("col3", expr("col2 + 3"))

This article describes several methods to simplify chained transformations.

DataFrame transform API

To benefit from the functional programming style in Spark, you can leverage the DataFrame transform API, for example:

%scala

val res = testDf.transform(func0(inc, 4))
                .transform(func1(1))
                .transform(func2(2))
                .withColumn("col3", expr("col2 + 3"))

Function.chain API

To go even further, you can leverage the Scala Function library, to chain the transformations, for example:

%scala

val chained = Function.chain(List(func0(inc, 4)(_), func1(1)(_), func2(2)(_)))
val res = testDf.transform(chained)
                .withColumn("col3", expr("col2 + 3"))

implicit class

Another alternative is to define a Scala implicit class, which allows you to eliminate the DataFrame transform API:

%scala

implicit class MyTransforms(df: DataFrame) {
    def func0(x: Int => Int, y: Int): DataFrame = {
        df.filter('col > x(y))
    }
    def func1(x: Int): DataFrame = {
        df.selectExpr("col", s"col + $x as col1")
    }
    def func2(add: Int): DataFrame = {
        df.withColumn("col2", expr(s"col1 + $add"))
    }
}

Then you can call the functions directly:

%scala

val res = testDf.func0(inc, 1)
            .func1(2)
            .func2(3)
            .withColumn("col3", expr("col2 + 3"))