Spark doesn’t support adding new columns or dropping existing columns in nested structures. In particular, the withColumn and drop methods of the Dataset class don’t allow you to specify a column name different from any top level columns. For example, suppose you have a dataset with the following schema:
%scala val schema = (new StructType) .add("metadata",(new StructType) .add("eventid", "string", true) .add("hostname", "string", true) .add("timestamp", "string", true) , true) .add("items", (new StructType) .add("books", (new StructType).add("fees", "double", true), true) .add("paper", (new StructType).add("pages", "int", true), true) ,true) schema.treeString
The schema looks like:
root |-- metadata: struct (nullable = true) | |-- eventid: string (nullable = true) | |-- hostname: string (nullable = true) | |-- timestamp: string (nullable = true) |-- items: struct (nullable = true) | |-- books: struct (nullable = true) | | |-- fees: double (nullable = true) | |-- paper: struct (nullable = true) | | |-- pages: integer (nullable = true)
Suppose you have the DataFrame:
%scala val rdd: RDD[Row] = sc.parallelize(Seq(Row( Row("eventid1", "hostname1", "timestamp1"), Row(Row(100.0), Row(10))))) val df = spark.createDataFrame(rdd, schema) display(df)
You want to increase the fees column, which is nested under books, by 1%. To update the fees column, you can reconstruct the dataset from existing columns and the updated column as follows:
%scala val updated = df.selectExpr(""" named_struct( 'metadata', metadata, 'items', named_struct( 'books', named_struct('fees', items.books.fees * 1.01), 'paper', items.paper ) ) as named_struct """).select($"named_struct.metadata", $"named_struct.items") updated.show(false)
Then you will get the result:
+-----------------------------------+-----------------+ | metadata | items | +===================================+=================+ | [eventid1, hostname1, timestamp1] | [[101.0], [10]] | +-----------------------------------+-----------------+