将Spark Dataframe的字符串列分割成多列

96 人关注

我看到不同的人建议 Dataframe.explode ,这是一个有用的方法,但它的结果是比原始数据框架的行数更多,这根本不是我想要的。我只是想做一个非常简单的Dataframe的等价物。

rdd.map(lambda row: row + [row.my_str_col.split('-')])

它需要一些看起来像。

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

并将其转换为这个。

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

我知道pyspark.sql.functions.split() ,但它的结果是一个嵌套的数组列,而不是我想要的两个顶层列。

理想情况下,我希望这些新列也能被命名。

apache-spark
pyspark
apache-spark-sql
Peter Gaultney
Peter Gaultney
发布于 2016-08-31
4 个回答
Peter Gaultney
Peter Gaultney
发布于 2021-11-11
0 人赞同

pyspark.sql.functions.split() 是这里的正确方法--你只需要将嵌套的ArrayType列平铺成多个顶层列。在这种情况下,每个数组只包含2个项目,这很容易。你只需使用 ,将数组的每一部分作为一个列本身来检索。 Column.getItem()

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

其结果将是。

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

我不确定在一般情况下,如果嵌套的数组从行到行的大小不一样,我将如何解决这个问题。

有没有办法把剩下的项目放在一列?即 split_col.getItem(2 - n) ,放在第三列。我想类似于上面的循环为所有项目建立列,然后将它们串联起来的方法可能可行,但我不知道这是否非常有效。
使用df.withColumn('NAME_remaining', pyspark.sql.functions.split(df[my_str_col'], '-',3).getItem(2)来获取剩余的项目。 spark.apache.org/docs/latest/api/sql/index.html
我发现,如果你试图将其中一个拆分项分配回原列,你必须在拆分前用ColumnRenamed()重命名原列,以避免出现显然与 issues.apache.org/jira/browse/SPARK-14948 有关的错误。
如何进行分割,使分割的第一部分是columnname,第二部分是列值?
pault
pault
发布于 2021-11-11
0 人赞同

这是一般情况下的解决方案,不需要提前知道数组的长度,使用 collect ,或使用 udf s。不幸的是,这只适用于 spark 2.1及以上版本,因为它需要 posexplode 函数。

假设你有以下的数据框架。

df = spark.createDataFrame(
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    , ["num", "letters"]
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

分割letters 列,然后使用posexplode ,将结果数组与数组中的位置一起爆出。接下来使用pyspark.sql.functions.expr 来抓取这个数组中索引为pos 的元素。

import pyspark.sql.functions as f
df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

现在我们从这个结果中创建两个新列。第一列是我们新列的名称,它将是letter 和数组中的索引的连接。第二列将是数组中相应索引的值。我们通过利用pyspark.sql.functions.expr 的功能得到后者,该功能允许我们使用列值作为参数

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

现在我们只需将groupBy numpivot DataFrame。把这一切放在一起,我们就得到了。

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+
    
我试着用3909个元素来分割~1.7M的原始行,它太慢了/一个小时后没有完成。
Luca Soato
Luca Soato
发布于 2021-11-11
0 人赞同

这是另一种方法,以备你想用分隔符来分割字符串。

import pyspark.sql.functions as f
df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
|   value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+
df_split = df.select(f.split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])
df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+

我不认为这种向RDDs的来回过渡会拖累你...... 也不用担心最后的模式规范:它是可选的,你可以避免它将解决方案泛化到具有未知列大小的数据。

我怎样才能在scala中做到这一点?我被flatMap lambda函数卡住了
请注意,该模式是以正则表达式的形式给出的,因此你需要对特殊字符使用()。
如果你不想在你的表达式中引用 df ,你可以把列的名字传给 split ,即 df.select(f.split("value",":"))...
@moshebeeri 你救了我!
cgapperi
cgapperi
发布于 2021-11-11
0 人赞同

我理解你的痛苦。使用split()可以工作,但也可能导致中断。

让我们以你的df为例,对它做一个小小的改动。

df = spark.createDataFrame([('1:"a:3":2001',),('2:"b":2002',),('3:"c":2003',)],["value"]) 
df.show()
+------------+
|       value|
+------------+
|1:"a:3":2001|
|  2:"b":2002|
|  3:"c":2003|
+------------+

如果你试图按照上面的方法对其应用split()。

df_split = df.select(split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"]).show()

IllegalStateException。输入行没有模式所要求的预期数值数量。4个字段是必需的,而提供的是3个值。

那么,有没有一种更优雅的方法来解决这个问题呢?我很高兴有人给我指出了这个问题。pyspark.sql.functions.from_csv()是你的朋友。

以我上面的例子df为例。

from pyspark.sql.functions import from_csv
# Define a column schema to apply with from_csv()
col_schema = ["col1 INTEGER","col2 STRING","col3 INTEGER"]
schema_str = ",".join(col_schema)
# define the separator because it isn't a ','
options = {'sep': ":"}
# create a df from the value column using schema and options
df_csv = df.select(from_csv(df.value, schema_str, options).alias("value_parsed"))
df_csv.show()
+--------------+
|  value_parsed|
+--------------+
|[1, a:3, 2001]|
|  [2, b, 2002]|
|  [3, c, 2003]|
+--------------+

那么我们就可以很容易地将df平铺到列中的值。

df2 = df_csv.select("value_parsed.*").toDF("col1","col2","col3")
df2.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1| a:3|2001|