本文介绍如何使用 Apache Spark 函数在列中生成唯一递增数值。

我们回顾一下要使用的三种不同方法。 你应该选择最适合你的用例的方法。

在弹性分布式数据集 (RDD) 中使用 zipWithIndex()

zipWithIndex() 函数仅在 RDD 中可用。 不能直接在 DataFrame 上使用该函数。

将 DataFrame 转换为 RDD,对数据应用 zipWithIndex() ,然后将 RDD 转换回 DataFrame。

我们将使用以下示例代码向具有两个条目的基本表中添加唯一 ID 号。

df = spark.createDataFrame(
        ('Alice','10'),('Susan','12')
    ['Name','Age']
df1=df.rdd.zipWithIndex().toDF()
df2=df1.select(col("_1.*"),col("_2").alias('increasing_id'))
df2.show()

运行示例代码,得到以下结果:

+-----+---+-------------+
| Name|Age|increasing_id|
+-----+---+-------------+
|Alice| 10|            0|
|Susan| 12|            1|
+-----+---+-------------+

使用 monotonically_increasing_id() 表示唯一但不连续的数字

monotonically_increasing_id() 函数生成单调递增的 64 位整数。

生成的 ID 号保证递增且唯一,但不能保证它们是连续的。

我们将使用以下示例代码向具有两个条目的基本表中添加单调递增的 ID 号。

from pyspark.sql.functions import *
df_with_increasing_id = df.withColumn("monotonically_increasing_id", monotonically_increasing_id())
df_with_increasing_id.show()

运行示例代码,得到以下结果:

+-----+---+---------------------------+
| Name|Age|monotonically_increasing_id|
+-----+---+---------------------------+
|Alice| 10|                 8589934592|
|Susan| 12|                25769803776|
+-----+---+---------------------------+

monotonically_increasing_id()row_number() 合并为两列

row_number() 函数生成连续的数字。

将其与 monotonically_increasing_id() 合并以生成两列可用于标识数据条目的数字。

我们将使用以下示例代码向具有两个条目的基本表中添加单调递增的 ID 号和行号。

from pyspark.sql.functions import *
from pyspark.sql.window import *
window = Window.orderBy(col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', row_number().over(window))
df_with_consecutive_increasing_id.show()

运行示例代码,得到以下结果:

+-----+---+---------------------------+-------------+
| Name|Age|monotonically_increasing_id|increasing_id|
+-----+---+---------------------------+-------------+
|Alice| 10|                 8589934592|            1|
|Susan| 12|                25769803776|            2|
+-----+---+---------------------------+-------------+

如果需要根据最近更新的最大值递增,则可以定义一个先前的最大值,然后从该值开始计数。

以刚刚运行的示例代码为基础。

首先,需要定义 previous_max_value 的值。 通常通过从现有输出表中提取值来实现此目的。 对于此示例,我们将其定义为 1000。

previous_max_value = 1000
df_with_consecutive_increasing_id.withColumn("cnsecutiv_increase", col("increasing_id") + lit(previous_max_value)).show()

将该值与前面的示例代码合并,并运行后,会得到以下结果:

+-----+---+---------------------------+-------------+------------------+
| Name|Age|monotonically_increasing_id|increasing_id|cnsecutiv_increase|
+-----+---+---------------------------+-------------+------------------+
|Alice| 10|                 8589934592|            1|              1001|
|Susan| 12|                25769803776|            2|              1002|
+-----+---+---------------------------+-------------+------------------+