This article shows you how to use Apache Spark functions to generate unique increasing numeric values in a column.
We review three different methods to use. You should select the method that works best with your use case.
Use zipWithIndex() in a Resilient Distributed Dataset (RDD)
The zipWithIndex() function is only available within RDDs. You cannot use it directly on a DataFrame.
Convert your DataFrame to a RDD, apply zipWithIndex() to your data, and then convert the RDD back to a DataFrame.
We are going to use the following example code to add unique id numbers to a basic table with two entries.
%python df = spark.createDataFrame( [ ('Alice','10'),('Susan','12') ], ['Name','Age'] ) df1=df.rdd.zipWithIndex().toDF() df2=df1.select(col("_1.*"),col("_2").alias('increasing_id')) df2.show()
Run the example code and we get the following results:
+-----+---+-------------+ | Name|Age|increasing_id| +-----+---+-------------+ |Alice| 10| 0| |Susan| 12| 1| +-----+---+-------------+
Use monotonically_increasing_id() for unique, but not consecutive numbers
The monotonically_increasing_id() function generates monotonically increasing 64-bit integers.
The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.
We are going to use the following example code to add monotonically increasing id numbers to a basic table with two entries.
%python from pyspark.sql.functions import * df_with_increasing_id = df.withColumn("monotonically_increasing_id", monotonically_increasing_id()) df_with_increasing_id.show()
Run the example code and we get the following results:
+-----+---+---------------------------+ | Name|Age|monotonically_increasing_id| +-----+---+---------------------------+ |Alice| 10| 8589934592| |Susan| 12| 25769803776| +-----+---+---------------------------+
Combine monotonically_increasing_id() with row_number() for two columns
The row_number() function generates numbers that are consecutive.
Combine this with monotonically_increasing_id() to generate two columns of numbers that can be used to identify data entries.
We are going to use the following example code to add monotonically increasing id numbers and row numbers to a basic table with two entries.
%python from pyspark.sql.functions import * from pyspark.sql.window import * window = Window.orderBy(col('monotonically_increasing_id')) df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', row_number().over(window)) df_with_consecutive_increasing_id.show()
Run the example code and we get the following results:
+-----+---+---------------------------+-------------+ | Name|Age|monotonically_increasing_id|increasing_id| +-----+---+---------------------------+-------------+ |Alice| 10| 8589934592| 1| |Susan| 12| 25769803776| 2| +-----+---+---------------------------+-------------+
If you need to increment based on the last updated maximum value, you can define a previous maximum value and then start counting from there.
We’re going to build on the example code that we just ran.
First, we need to define the value of previous_max_value. You would normally do this by fetching the value from your existing output table. For this example, we are going to define it as 1000.
%python previous_max_value = 1000 df_with_consecutive_increasing_id.withColumn("cnsecutiv_increase", col("increasing_id") + lit(previous_max_value)).show()
When this is combined with the previous example code and run, we get the following results:
+-----+---+---------------------------+-------------+------------------+ | Name|Age|monotonically_increasing_id|increasing_id|cnsecutiv_increase| +-----+---+---------------------------+-------------+------------------+ |Alice| 10| 8589934592| 1| 1001| |Susan| 12| 25769803776| 2| 1002| +-----+---+---------------------------+-------------+------------------+