相关文章推荐
逼格高的铅笔  ·  javascript - ...·  1 年前    · 
活泼的针织衫  ·  java - Is there a ...·  1 年前    · 
爽快的红烧肉  ·  How to color series ...·  1 年前    · 
酒量小的石榴  ·  java jxl ...·  1 年前    · 

Json Structure is -:
aa.json

[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]
用于读取DataFrame的代码:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

a=sqlContext.read.option('multiline',"true").json('aa.json');

a.show()
foo foo1
null null

a.printSchema()
root
|-- foo: string (nullable = true)
|-- foo1: string (nullable = true)
以下是读取此json的行,它可以解析模式而不是数据。

应用一些正则表达式并转换为rdd可能对您有用。

先使用textFile以下方法读取文件:

a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)

+-------------------------------------+

|value |

+-------------------------------------+

|[[{"foo":"test1"},{"foo1":"test21"}],|

|[{"foo":"test2"},{"foo1":"test22"}], |

|[{"foo":"test3"},{"foo1":"test23"}]] |

+-------------------------------------+

现在我们可以使用pyspark.sql.functions.regexp_replace从每行中删除额外的方括号和尾随逗号:

from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^[(?=[))|((?<=])]$)|(,$)", "").alias("value"))
a.show(truncate=False)

+-----------------------------------+

|value |

+-----------------------------------+

|[{"foo":"test1"},{"foo1":"test21"}]|

|[{"foo":"test2"},{"foo1":"test22"}]|

|[{"foo":"test3"},{"foo1":"test23"}]|

+-----------------------------------+

这里的模式是逻辑或以下模式:

^[(?=[):字符串开头后跟[[(第二[个是非捕获组)
(?<=])]$:]]在字符串的末尾(第]一个是非捕获组)
,$:字符串末尾的逗号
任何匹配的模式都将替换为空字符串。

现在转换为rdd并使用json.loads将行解析为字典列表。然后将所有这些字典合并到一个字典中并调用pyspark.sql.Row构造函数。最后调用.toDF转换回DataFrame。

From How to merge two dictionaries in a single expression?

This code works for python 2 and 3

def merge_two_dicts(x, y):

z = x.copy()   # start with x's keys and values
z.update(y)    # modifies z with y's keys and values & returns None
return z

import json
from pyspark.sql import Row
from functools import reduce

a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()

+-----+------+

| foo| foo1|

+-----+------+

|test1|test21|

|test2|test22|

|test3|test23|

+-----+------+

2019-07-17 23:23:00 企业邮箱发送邮件时,若出现投递失败产生退信,内容提示包含如下: the mta server of * reply:550 failed to meet SPF requirements 或者 the mta server of 163.com — 163mx01.mxmail.netease.com(220.181.14.141) reply:550 MI:SPF mx14,QMCowECpA0qTiftVaeB3Cg—.872S2 1442548128 http://mail.163.com/help 298984 Spring Boot 如果防护 XSS + SQL 注入攻击 ?一文带你搞定! 阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!(2) Spring Boot 接口加解密,新姿势来了! Socket学习网络基础 任务拆解,悠然自得,自动版本的ChatGPT,AutoGPT自动人工智能AI任务实践(Python3.10)