Json Structure is -:
aa.json
[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]
用于读取DataFrame的代码:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
a=sqlContext.read.option('multiline',"true").json('aa.json');
a.show()
|
foo
|
foo1
|
null
|
null
|
a.printSchema()
root
|-- foo: string (nullable = true)
|-- foo1: string (nullable = true)
以下是读取此json的行,它可以解析模式而不是数据。
应用一些正则表达式并转换为rdd可能对您有用。
先使用textFile以下方法读取文件:
a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
+-------------------------------------+
|value |
+-------------------------------------+
|[[{"foo":"test1"},{"foo1":"test21"}],|
|[{"foo":"test2"},{"foo1":"test22"}], |
|[{"foo":"test3"},{"foo1":"test23"}]] |
+-------------------------------------+
现在我们可以使用pyspark.sql.functions.regexp_replace从每行中删除额外的方括号和尾随逗号:
from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^[(?=[))|((?<=])]$)|(,$)", "").alias("value"))
a.show(truncate=False)
+-----------------------------------+
|value |
+-----------------------------------+
|[{"foo":"test1"},{"foo1":"test21"}]|
|[{"foo":"test2"},{"foo1":"test22"}]|
|[{"foo":"test3"},{"foo1":"test23"}]|
+-----------------------------------+
这里的模式是逻辑或以下模式:
^[(?=[):字符串开头后跟[[(第二[个是非捕获组)
(?<=])]$:]]在字符串的末尾(第]一个是非捕获组)
,$:字符串末尾的逗号
任何匹配的模式都将替换为空字符串。
现在转换为rdd并使用json.loads将行解析为字典列表。然后将所有这些字典合并到一个字典中并调用pyspark.sql.Row构造函数。最后调用.toDF转换回DataFrame。
From
How to merge two dictionaries in a single expression?
This code works for python 2 and 3
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
import json
from pyspark.sql import Row
from functools import reduce
a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
+-----+------+
| foo| foo1|
+-----+------+
|test1|test21|
|test2|test22|
|test3|test23|
+-----+------+
2019-07-17 23:23:00
企业邮箱发送邮件时,若出现投递失败产生退信,内容提示包含如下: the mta server of * reply:550 failed to meet SPF requirements 或者 the mta server of 163.com — 163mx01.mxmail.netease.com(220.181.14.141) reply:550 MI:SPF mx14,QMCowECpA0qTiftVaeB3Cg—.872S2 1442548128 http://mail.163.com/help
298984
Spring Boot 如果防护 XSS + SQL 注入攻击 ?一文带你搞定!
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!(2)
Spring Boot 接口加解密,新姿势来了!
Socket学习网络基础
任务拆解,悠然自得,自动版本的ChatGPT,AutoGPT自动人工智能AI任务实践(Python3.10)