相关文章推荐
爱吹牛的瀑布  ·  PySpark 读写 JSON 文件到 ...·  1 月前    · 
活泼的高山  ·  python在excel增加新一行作为标题行 ...·  6 月前    · 
淡定的仙人球  ·  木偶师page.goTo()永远不会完成-腾 ...·  9 月前    · 
有爱心的卡布奇诺  ·  给自己写一个案件管理工具(二) - 少数派·  1 年前    · 
逃跑的灭火器  ·  容器环境与可插拔设备(一)---udev - 知乎·  1 年前    · 
善良的伏特加  ·  SSH配置Linux免密登录_51CTO博客 ...·  1 年前    · 
Code  ›  PySpark 读写 JSON 文件到 DataFrame开发者社区
schema spark dataframe zipcode
https://cloud.tencent.com/developer/article/2322961
爱吹牛的瀑布
1 月前
数据STUDIO

PySpark 读写 JSON 文件到 DataFrame

前往小程序,Get 更优 阅读体验!
立即前往
腾讯云
开发者社区
文档 建议反馈 控制台
首页
学习
活动
专区
圈层
工具
文章/答案/技术大牛
发布
首页
学习
活动
专区
圈层
工具
返回腾讯云官网
数据STUDIO
首页
学习
活动
专区
圈层
工具
返回腾讯云官网
社区首页 > 专栏 > PySpark 读写 JSON 文件到 DataFrame

PySpark 读写 JSON 文件到 DataFrame

作者头像
数据STUDIO
发布 于 2023-09-04 12:37:02
发布 于 2023-09-04 12:37:02
1.2K 0 0
代码可运行
举报
文章被收录于专栏: 数据STUDIO 数据STUDIO
运行总次数: 0
代码可运行
本文中,云朵君将和大家一起学习了如何将具有单行记录和多行记录的 JSON 文件读取到 PySpark DataFrame 中,还要学习一次读取单个和多个文件以及使用不同的保存选项将 JSON 文件写回 DataFrame。

PySpark SQL 提供 read.json("path") 将单行或多行(多行)JSON 文件读取到 PySpark DataFrame 并 write.json("path") 保存或写入 JSON 文件的功能,在本教程中,您将学习如何读取单个文件、多个文件、目录中的所有文件进入 DataFrame 并使用 Python 示例将 DataFrame 写回 JSON 文件。

注意: 开箱即用的 PySpark API 支持将 JSON 文件和更多文件格式读取到 PySpark DataFrame 中。

使用 read.json("path") 或者 read.format("json").load("path") 方法将文件路径作为参数,可以将 JSON 文件读入 PySpark DataFrame。

与读取 CSV 不同,默认情况下,来自输入文件的 JSON 数据源推断模式。

此处使用的 zipcodes.json 文件可以从 GitHub 项目下载。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.json

代码语言: javascript
代码 运行次数: 0
运行
复制
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()

当使用 format("json") 方法时,还可以通过其完全限定名称指定数据源,如下所示。

代码语言: javascript
代码 运行次数: 0
运行
复制
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') \
        .load("PyDataStudio/zipcodes.json")

从多行读取 JSON 文件

PySpark JSON 数据源在不同的选项中提供了多个读取文件的选项,使用 multiline 选项读取分散在多行的 JSON 文件。默认情况下,多行选项设置为 false。

下面是我们要读取的输入文件,同样的文件也可以在Github上找到。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/multiline-zipcode.json

代码语言: javascript
代码 运行次数: 0
运行
复制
[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"

使用 read.option("multiline","true")

代码语言: javascript
代码 运行次数: 0
运行
复制
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()    

一次读取多个文件

还可以使用 read.json() 方法从不同路径读取多个 JSON 文件,只需通过逗号分隔传递所有具有完全限定路径的文件名,例如

代码语言: javascript
代码 运行次数: 0
运行
复制
# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json',
     'resources/zipcode2.json'])
df2.show()  

读取目录中的所有文件

只需将目录作为 json() 方法的路径传递给该方法,我们就可以将目录中的所有 JSON 文件读取到 DataFrame 中。

代码语言: javascript
代码 运行次数: 0
运行
复制
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

使用用户自定义架构读取文件

PySpark Schema 定义了数据的结构,换句话说,它是 DataFrame 的结构。PySpark SQL 提供 StructType 和 StructField 类以编程方式指定 DataFrame 的结构。

如果事先知道文件的架构并且不想使用 inferSchema 选项来指定列名和类型,请使用指定的自定义列名schema并使用 schema 选项键入。

使用 PySpark StructType 类创建自定义 Schema,下面我们启动这个类并使用添加方法通过提供列名、数据类型和可为空的选项向其添加列。

代码语言: javascript
代码 运行次数: 0
运行
复制
# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
df_with_schema = spark.read.schema(schema) \
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

使用 PySpark SQL 读取 JSON 文件

PySpark SQL 还提供了一种读取 JSON 文件的方法,方法是使用 spark.sqlContext.sql(“将 JSON 加载到临时视图”) 直接从读取文件创建临时视图

代码语言: javascript
代码 运行次数: 0
运行
复制
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode").show()

读取 JSON 文件时的选项

NullValues

使用 nullValues 选项,可以将 JSON 中的字符串指定为 null。例如,如果想考虑一个值为 1900-01-01 的日期列,则在 DataFrame 上设置为 null。

DateFormat

选项 dateFormat 用于设置输入 DateType 和 TimestampType 列的格式的选项。支持所有 java.text.SimpleDateFormat 格式。

注意: 除了上述选项外,PySpark JSON 数据集还支持许多其他选项。

应用 DataFrame 转换

从 JSON 文件创建 PySpark DataFrame 后,可以应用 DataFrame 支持的所有转换和操作。

将 PySpark DataFrame 写入 JSON 文件

在 DataFrame 上使用 PySpark DataFrameWriter 对象 write 方法写入 JSON 文件。

代码语言: javascript
代码 运行次数: 0
运行
复制
df2.write.json("/PyDataStudio/spark_output/zipcodes.json")

编写 JSON 文件时的 PySpark 选项

在编写 JSON 文件时,可以使用多个选项。如 nullValue , dateFormat

PySpark 保存模式

PySpark DataFrameWriter 还有一个方法 mode() 来指定 SaveMode;此方法的参数采用 overwrite , append , ignore , errorifexists .

  • overwrite – 模式用于覆盖现有文件
  • append – 将数据添加到现有文件
  • ignore – 当文件已经存在时忽略写操作
  • errorifexists 或 error – 这是文件已存在时的默认选项,它返回错误
代码语言: javascript
代码 运行次数: 0
运行
复制
 df2.write.mode('Overwrite') \
       .json("/PyDataStudio/spark_output/zipcodes.json")

源代码供参考

此示例也可在GitHub PySpark 示例项目中获得以供参考。

代码语言: javascript
代码 运行次数: 0
运行
复制
# https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-read-json.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()
# Read JSON file into dataframe    
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()
#Read multiple files
df2 = spark.read.json(
    ['PyDataStudio/zipcode2.json','PyDataStudio/zipcode1.json'])
df2.show()    
#Read All JSON files from a directory
df3 = spark.read.json("PyDataStudio/*.json")
df3.show()
# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
df_with_schema = spark.read.schema(schema) \
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS" + 
      " (path 'PyDataStudio/zipcodes.json')")
 
推荐文章
爱吹牛的瀑布  ·  PySpark 读写 JSON 文件到 DataFrame开发者社区
1 月前
活泼的高山  ·  python在excel增加新一行作为标题行 - CSDN文库
6 月前
淡定的仙人球  ·  木偶师page.goTo()永远不会完成-腾讯云开发者社区-腾讯云
9 月前
有爱心的卡布奇诺  ·  给自己写一个案件管理工具(二) - 少数派
1 年前
逃跑的灭火器  ·  容器环境与可插拔设备(一)---udev - 知乎
1 年前
善良的伏特加  ·  SSH配置Linux免密登录_51CTO博客_linux配置ssh免密登录
1 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号