这是一般情况下的解决方案,不需要提前知道数组的长度,使用
collect
,或使用
udf
s。不幸的是,这只适用于
spark
2.1及以上版本,因为它需要
posexplode
函数。
假设你有以下的数据框架。
df = spark.createDataFrame(
[1, 'A, B, C, D'],
[2, 'E, F, G'],
[3, 'H, I'],
[4, 'J']
, ["num", "letters"]
df.show()
分割letters
列,然后使用posexplode
,将结果数组与数组中的位置一起爆出。接下来使用pyspark.sql.functions.expr
来抓取这个数组中索引为pos
的元素。
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
.show()
现在我们从这个结果中创建两个新列。第一列是我们新列的名称,它将是letter
和数组中的索引的连接。第二列将是数组中相应索引的值。我们通过利用pyspark.sql.functions.expr
的功能得到后者,该功能允许我们使用列值作为参数。
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
.show()
现在我们只需将groupBy
num
和pivot
DataFrame。把这一切放在一起,我们就得到了。
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
.groupBy("num").pivot("name").agg(f.first("val"))\
.show()