Spark doesn’t support adding new columns or dropping existing columns in nested structures. In particular, the withColumn and drop methods of the Dataset class don’t allow you to specify a column name different from any top level columns. For example, suppose you have a dataset with the following schema:
%scala
val schema = (new StructType)
.add("metadata",(new StructType)
.add("eventid", "string", true)
.add("hostname", "string", true)
.add("timestamp", "string", true)
, true)
.add("items", (new StructType)
.add("books", (new StructType).add("fees", "double", true), true)
.add("paper", (new StructType).add("pages", "int", true), true)
,true)
schema.treeStringThe schema looks like:
root |-- metadata: struct (nullable = true) | |-- eventid: string (nullable = true) | |-- hostname: string (nullable = true) | |-- timestamp: string (nullable = true) |-- items: struct (nullable = true) | |-- books: struct (nullable = true) | | |-- fees: double (nullable = true) | |-- paper: struct (nullable = true) | | |-- pages: integer (nullable = true)
Suppose you have the DataFrame:
%scala
val rdd: RDD[Row] = sc.parallelize(Seq(Row(
Row("eventid1", "hostname1", "timestamp1"),
Row(Row(100.0), Row(10)))))
val df = spark.createDataFrame(rdd, schema)
display(df)You want to increase the fees column, which is nested under books, by 1%. To update the fees column, you can reconstruct the dataset from existing columns and the updated column as follows:
%scala
val updated = df.selectExpr("""
named_struct(
'metadata', metadata,
'items', named_struct(
'books', named_struct('fees', items.books.fees * 1.01),
'paper', items.paper
)
) as named_struct
""").select($"named_struct.metadata", $"named_struct.items")
updated.show(false)Then you will get the result:
+-----------------------------------+-----------------+ | metadata | items | +===================================+=================+ | [eventid1, hostname1, timestamp1] | [[101.0], [10]] | +-----------------------------------+-----------------+