Pyspark:解析一列json字符串

53 人关注

我有一个 pyspark 数据框架,包括一列,叫做 json ,其中每一行都是一个 unicode 的 json 字符串。我想解析每一行并返回一个新的数据框架,其中每一行都是解析过的json。

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

我试着用json.loads对每一行进行映射。

.select('json') .map(lambda x: json.loads(x)) .toDF() ).show()

但这返回了一个TypeError: expected string or buffer

我怀疑部分问题是,当从dataframe转换到rdd时,模式信息会丢失,所以我也试过手动输入模式信息。

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .map(lambda x: json.loads(x))
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

但我得到了同样的TypeError

看一看这个答案看起来用flatMap将行平铺在这里可能有用,但我也没有成功。

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

I get this error: AttributeError: 'unicode' object has no attribute 'get'.

python
json
apache-spark
pyspark
Steve
Steve
发布于 2016-12-13
7 个回答
Martin Tapp
Martin Tapp
发布于 2022-08-05
已采纳
0 人赞同

For Spark 2.1+ ,你可以使用 from_json 这允许在数据框架内保留其他非json列,如下所示。

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

你让Spark推导出json字符串列的模式。那么df.json列就不再是StringType了,但是正确解码的json结构,即嵌套的StrucTypedf的所有其他列都被原样保留了。

你可以按以下方式访问json内容。

df.select(col('json.header').alias('header'))
    
当我用流式数据框架(结构化流)尝试时,我得到一个错误,即用流式来源的查询必须用writeStream.start();;/nkafka执行。能否请你帮助我如何使用来自kafka流的JSON数据。
只需使用普通的dataframe/rdd,从一批/样本数据中提取json模式。然后,在你的流媒体应用程序中使用提取的模式。
嗨,你能告诉我你的代码中的 col 是什么吗? 是'json'列对象吗?
这是一个Spark函数,你可以导入看到 spark.apache.org/docs/2.4.0/api/python/...
让Spark推导出json的模式会不会效率很低?
Mariusz
Mariusz
发布于 2022-08-05
0 人赞同

如果你在之前将数据框架转换为字符串的RDD,那么在spark中将带有json字符串的数据框架转换为结构化的数据框架其实很简单(见。 http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets )

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)
    
这很好--谢谢!有什么方法可以将结构类型转换为地图类型吗?在我的代码的后面,我通过 explode 解析出每个maptype的列。
啊,我想我已经明白了。我可以通过这样的方式避免使用maptypes。 body = new_df.select('body').rdd.map(lambda r: r.body).toDF()
实际上,这要简单得多:只要输入 new_df.select('body') ,你就会得到只有主体对象的数据框架。
酷!,有没有办法将新的数据框架与原来的数据框架连接起来(除了json字符串,还有其他字段)?
@OphirYoktan 很遗憾没有。对于这一点,我推荐马丁的答案中描述的 from_json
Nolan Conaway
Nolan Conaway
发布于 2022-08-05
0 人赞同

如果你的JSON不是完美的/传统的格式化,现有的答案就不起作用。例如,基于RDD的模式推理期望JSON在大括号中 {} ,并将提供一个不正确的模式(导致 null 的值),如果,你的数据看起来像。

"a": 1.0, "b": 1 "a": 0.0, "b": 2

我写了一个函数来解决这个问题,通过对JSON进行消毒,使其存在于另一个JSON对象中。

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.
    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.
    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.
    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.
    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    res = df
    for i in cols:
        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))
        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

Note: psf = pyspark.sql.functions.

> For example, the RDD-based schema inference expects JSON in curly-braces where did you read this? awesome find!
"你在哪里读到这个?".我不能说我在哪里读到的,我只是发现 pyspark 不能解析我的 JSON,除非这是真的。
Buthetleon
Buthetleon
发布于 2022-08-05
0 人赞同

这里是@nolan-conaway的 parseJSONCols 函数的简明(火花SQL)版本。

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;
  

PS.我也添加了爆炸功能 :P

You'll need to know some HIVE SQL types

S.W.Zhang
S.W.Zhang
发布于 2022-08-05
0 人赞同
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def map2json(dict):
    import json
    return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())
spark.sql("select map2json(map('a', '1'))").show()
    
ZettaP
ZettaP
发布于 2022-08-05
0 人赞同

如果你不知道每个JSON的模式(它可能是不同的),你可以使用.NET技术。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# ... here you get your DF
# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

请注意,它不会保留你的数据集中的任何其他列。 来自:https://github.com/apache/spark/pull/22775

Mr. Low
Mr. Low
发布于 2022-08-05
0 人赞同

如果你的JSON字符串是JSON数组而不是对象(我没有代表,所以无法评论),这个答案是为了增加背景。如果你使用 马丁-塔普的坚实答案 它将为你的列返回空值。

tl;dr

如果你的JSON字符串是像这样的数组对象。

[{"a":1, "b":1.0}]

替换代码1】将返回一个数据框架,该框架包含了elements而不是包括数组本身。from_json对这一点并不满意,所以为了达到它想要的具体效果,你可以将由spark.read.json推断出的模式包裹在ArrayType中,它将正确解析(而不是为所有东西返回空值)。

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType
array_item_schema = \
  spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema
json_array_schema = ArrayType(array_item_schema, True)
arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))
objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))

Intro

作为对Nolan Conaway的补充,似乎当你的JSON的形式为

"a": 1.0, "b": 1 "a": 0.0, "b": 2

在顶层对象是一个数组(而不是一个对象)的情况下,pyspark的spark.read.json()将数组作为一个对象的集合来处理,将其转换为行,而不是单一的行。

参见在PySpark 3.3.0 shell中运行的例子。

>>> myjson        = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson   = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
|  a|  b|
+---+---+
|1.0|  1|
|2.0|  2|
|3.0|  3|
|4.0|  4|
+---+---+
>>> spark_read_df.printSchema()
 |-- a: double (nullable = true)
 |-- b: long (nullable = true)

我们可以看到,myjsonmyotherjson是JSON对象的JSON数组,它们被扩展为每个对象都有一行。当其中一个JSON字符串rawobjectjson只是一个原始对象时,它也能顺利处理。我认为文档在这里有一点不足,因为我找不到关于数组对象的处理方法。

现在让我们创建一个带有JSON字符串列的数据框架。我们将放弃rawobjectjson,因为我们将看到from_json要求每个字符串都有相同的模式(而这个includes如果有的话,就是顶层阵列)。

>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
...     (myjson,),
...     (myotherjson,),
... ]
>>> json_df_schema = StructType([
...     StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
|        json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+

现在我试图用spark.read.json推断出的模式传递给from_json,把JSON列读成对象。但它一直返回完全null的列。.正如Nolan Conaway提到的,当传递给from_json的模式无法应用于给定的字符串时,就会发生这种情况。

问题在于,在这些字符串中,它将顶层视为一个数组,但正如spark_read_df.printSchema()所显示的,由spark.read.json()推断出的模式忽略了数组层。

The Solution

因此,我最终采用的解决方案是,在进行读取时,只考虑模式中的顶层数组。

from pyspark.sql import functions as F
# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()
# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)
json_extracted_df = raw_json_df.select(
    F.from_json('json_strings', json_array_schema)
        .alias('json_arrays')
>>> json_extracted_df.show()
+--------------------+
|         json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
|          [{3.0, 3}]|
+--------------------+
>>> json_extracted_df.printSchema()
 |-- json_arrays: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: double (nullable = true)
 |    |    |-- b: long (nullable = true)

从那里可以用pyspark.sql.functions.explode将对象从阵列中拉出来。

>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+
>>> exploded_df.printSchema()