内容来源于 Stack Overflow,遵循 CC BY-SA 4.0 许可协议进行翻译与使用。IT领域专用引擎提供翻译支持
腾讯云小微IT领域专用引擎提供翻译支持
我对pyspark和json解析还是个新手,我被困在了某些特定的场景中。让我先解释一下我要做什么,我有一个json文件,其中有一个数据元素,这个数据元素是一个包含另外两个json对象的数组。给定的json文件如下所示
{ "id": "da20d14c.92ba6", "type": "Data Transformation Node", "name": "", "topic": "", "x": 380, "y": 240, "typeofoperation":"join", "wires": [ ["da20d14c.92ba6","da20d14c.93ba6"] "output":true, "data":[ "metadata_id":"3434", "id":"1", "first_name":"Brose", "last_name":"Eayres", "email":"beayres0@archive.org", "gender":"Male", "postal_code":null "metadata_id":"3434", "id":"2", "first_name":"Brose", "last_name":"Eayres", "email":"beayres0@archive.org", "gender":"Male", "postal_code":null }
现在我要做的是一个接一个地迭代那个数据数组:意思是迭代到json的第一个对象,将其存储到一个dataframe中,然后迭代到第二个对象,并将其存储到另一个dataframe中,然后对它们进行完全连接或任何类型的连接。(这是可能的吗)
如果是,如何在pyspark中做到这一点。到目前为止,我所做的是
试图分解它,但数据是一次性分解的,而不是一个一个地分解
from pyspark.sql import SparkSession from pyspark.sql.functions import explode, col from pyspark.sql.functions import * from pyspark.sql import Row from pyspark.sql import SQLContext from pyspark import SparkConf, SparkContext spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .getOrCreate() sc = SparkContext.getOrCreate() dataFrame = spark.read.option("multiline", "true").json("nodeWithTwoRddJoin.json") dataNode = dataFrame.select(explode("data").alias("Data_of_node")) dataNode.show()
但是上面的代码给了我一个集合数据集。比我以前用的
firstDataSet = dataNode.collect()[0] secondDataSet = dataNode.collect()[1]
这些行给了我一行,但我不能将其返回到dataframe。任何建议和解决方案
这至少将它们放在两个数据帧中
from pyspark.sql.functions import monotonically_increasing_id df_with_id = dataNode.withColumn("id",monotonically_increasing_id()) max_id = df_with_id.agg({"id": "max"}).collect()[0]["max(id)"] first_df = df_with_id.where("id = {maxid}".format(maxid=max_id)) second_df = df_with_id.where("id != {maxid}".format(maxid=max_id))
您需要在数据帧的每一行上应用一个映射,该映射将其中一列的内容拆分为两个新列。将结果分成两个数据帧之后就很简单了。为此,我使用了一个简单的函数,它从数组中返回所需的索引:
def splitArray(array, pos): return array[pos]
您可以像这样应用此函数:
import pyspark.sql.functions as f mapped = dataFrame.select( splitArray(f.col('data'), 0).alias('first'), splitArray(f.col('data'), 1).alias('second'))
(我使用了内置的列功能来选择数据列。我不确定是否有更优雅的方法来实现这一点。)
结果是:
+-----------------------------------------------------+-----------------------------------------------------+