Sometimes you may need to perform multiple transformations on your DataFrame:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
val testDf = (1 to 10).toDF("col")
def func0(x: Int => Int, y: Int)(in: DataFrame): DataFrame = {
in.filter('col > x(y))
}
def func1(x: Int)(in: DataFrame): DataFrame = {
in.selectExpr("col", s"col + $x as col1")
}
def func2(add: Int)(in: DataFrame): DataFrame = {
in.withColumn("col2", expr(s"col1 + $add"))
}When you apply these transformations, you may end up with spaghetti code like this:
%scala
def inc(i: Int) = i + 1
val tmp0 = func0(inc, 3)(testDf)
val tmp1 = func1(1)(tmp0)
val tmp2 = func2(2)(tmp1)
val res = tmp2.withColumn("col3", expr("col2 + 3"))This article describes several methods to simplify chained transformations.
DataFrame transform API
To benefit from the functional programming style in Spark, you can leverage the DataFrame transform API, for example:
%scala
val res = testDf.transform(func0(inc, 4))
.transform(func1(1))
.transform(func2(2))
.withColumn("col3", expr("col2 + 3"))Function.chain API
To go even further, you can leverage the Scala Function library, to chain the transformations, for example:
%scala
val chained = Function.chain(List(func0(inc, 4)(_), func1(1)(_), func2(2)(_)))
val res = testDf.transform(chained)
.withColumn("col3", expr("col2 + 3"))implicit class
Another alternative is to define a Scala implicit class, which allows you to eliminate the DataFrame transform API:
%scala
implicit class MyTransforms(df: DataFrame) {
def func0(x: Int => Int, y: Int): DataFrame = {
df.filter('col > x(y))
}
def func1(x: Int): DataFrame = {
df.selectExpr("col", s"col + $x as col1")
}
def func2(add: Int): DataFrame = {
df.withColumn("col2", expr(s"col1 + $add"))
}
}Then you can call the functions directly:
%scala
val res = testDf.func0(inc, 1)
.func1(2)
.func2(3)
.withColumn("col3", expr("col2 + 3"))