("Adam","Angra", "Anastasia"), ( "Boris" , "Borun" , "Bisma" ), ( "Shawn" , "Samar" , "Statham" ) ).toDF( "fname" , "mname" , "lname" ) df.createOrReplaceTempView( "df" )

我想让Spark的sql输出像下面这样。

struct
{"data_description":"fname","data_details":"Adam"},{"data_description":"mname","data_details":"Angra"},{"data_description":"lname","data_details":"Anastasia"}
{"data_description":"fname","data_details":"Boris"},{"data_description":"mname","data_details":"Borun"},{"data_description":"lname","data_details":"Bisma"}
{"data_description":"fname","data_details":"Shawn"},{"data_description":"mname","data_details":"Samar"},{"data_description":"lname","data_details":"Statham"}

到目前为止,我尝试了下面的方法。

val df1 = spark.sql("""select concat(fname,':',mname,":",lname) as name from df""")
df1.createOrReplaceTempView("df1")
val df2 = spark.sql("""select named_struct('data_description','fname','data_details',split(name, ':')[0]) as struct1,named_struct('data_description','mname','data_details',split(name, ':')[1]) as struct2, named_struct('data_description','lname','data_details',split(name, ':')[2]) as struct3 from df1""")
df2.createOrReplaceTempView("df2")

上面的输出。

struct1 struct2 struct3
{"data_description":"fname","data_details":"Adam"}  {"data_description":"mname","data_details":"Angra"} {"data_description":"lname","data_details":"Anastasia"}
{"data_description":"fname","data_details":"Boris"} {"data_description":"mname","data_details":"Borun"} {"data_description":"lname","data_details":"Bisma"}
{"data_description":"fname","data_details":"Shawn"} {"data_description":"mname","data_details":"Samar"} {"data_description":"lname","data_details":"Statham"}

但是我得到了3个不同的结构。我需要在一个单一的结构中用逗号分隔所有的结构

sql
json
scala
apache-spark
apache-spark-sql
Venkatesh Gotimukul
Venkatesh Gotimukul
发布于 2021-12-20
2 个回答
过过招
过过招
发布于 2021-12-20
0 人赞同

sql语句如下,其他的如你所知。

val sql = """
    select
        concat_ws(
            ,concat('{"data_description":"fname","data_details":"',fname,'"}')
            ,concat('{"data_description":"mname","data_details":"',mname,'"}')
            ,concat('{"data_description":"lname","data_details":"',lname,'"}')
        ) as struct
    from df
    
blackbishop
blackbishop
发布于 2021-12-20
0 人赞同

你可以创建结构数组,如果你希望输出为字符串,可以使用to_json

spark.sql("""
select  to_json(array(
          named_struct('data_description','fname','data_details', fname),
          named_struct('data_description','mname','data_details', mname), 
          named_struct('data_description','lname','data_details', lname) 
        )) as struct
from  df
""").show()
//+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
//|struct                                                                                                                                                          |
//+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
//|[{"data_description":"fname","data_details":"Adam"},{"data_description":"mname","data_details":"Angra"},{"data_description":"lname","data_details":"Anastasia"}]|
//|[{"data_description":"fname","data_details":"Boris"},{"data_description":"mname","data_details":"Borun"},{"data_description":"lname","data_details":"Bisma"}]   |
//|[{"data_description":"fname","data_details":"Shawn"},{"data_description":"mname","data_details":"Samar"},{"data_description":"lname","data_details":"Statham"}] |
//+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

如果你有很多列,你可以像这样动态地生成结构sql表达式。

val structs = df.columns.map(c => s"named_struct('data_description','$c','data_details', $c)").mkString(",")
val df2 = spark.sql(s"""
  select  to_json(array($structs)) as struct
  from  df
""")

如果你不想使用数组,你可以简单地将3个结构的to_json 的结果连接起来。

val structs = df.columns.map(c => s"to_json(named_struct('data_description','$c','data_details', $c))").mkString(",")
val df2 = spark.sql(s"""