我有一个 pyspark 数据框架,包括一列,叫做
json
,其中每一行都是一个 unicode 的 json 字符串。我想解析每一行并返回一个新的数据框架,其中每一行都是解析过的json。
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
我试着用json.loads
对每一行进行映射。
.select('json')
.map(lambda x: json.loads(x))
.toDF()
).show()
但这返回了一个TypeError: expected string or buffer
。
我怀疑部分问题是,当从dataframe
转换到rdd
时,模式信息会丢失,所以我也试过手动输入模式信息。
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.map(lambda x: json.loads(x))
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
但我得到了同样的TypeError
。
看一看这个答案看起来用flatMap
将行平铺在这里可能有用,但我也没有成功。
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get'
.