相关文章推荐
活泼的卤蛋  ·  EAGAIN、EWOULDBLOCK、EIN ...·  1 年前    · 
重感情的板凳  ·  sql server 为空转换 ...·  1 年前    · 
仗义的登山鞋  ·  使用 Azure Functions ...·  1 年前    · 
兴奋的草稿本  ·  how to print void ...·  2 年前    · 

本文通过演练一些简单的示例来说明 PySpark 的用法。 本文假定你了解基本的 Apache Spark 概念 ,并在连接到计算的 Azure Databricks 笔记本 中运行命令。 你将使用示例数据创建 DataFrame,对这些数据执行基本转换(包括行和列操作),合并多个 DataFrame 并聚合这些数据,可视化这些数据,然后将其保存到表或文件中。

本文中的一些示例使用 Databricks 提供的示例数据来演示如何使用 DataFrame 加载、转换和保存数据。 如果想要使用 Databricks 中尚不存在的你自己的数据,可以先将其上传,然后使用这些数据创建 DataFrame。 请参阅 使用文件上传创建或修改表 将文件上传到 Unity Catalog 卷

关于 Databricks 示例数据

Databricks 在 samples 目录和 /databricks-datasets 目录中提供了示例数据。

  • 若要访问 samples 目录中的示例数据,请使用格式 samples.<schema-name>.<table-name> 。 本文使用 samples.tpch 架构中的表,其中包含一家虚构企业的数据。 customer 表包含客户的相关信息, orders 包含这些客户所下订单的相关信息。
  • 使用 dbutils.fs.ls 浏览 /databricks-datasets 中的数据。 使用 Spark SQL 或 DataFrame,通过文件路径来查询此位置中的数据。 若要详细了解 Databricks 提供的示例数据,请参阅 示例数据集
  • 导入数据类型

    许多 PySpark 操作都要求使用 SQL 函数或与本机 Spark 类型交互。 直接导入所需的函数和类型,或者为了避免重写 Python 内置函数,请使用通用别名导入这些模块。

    # import select functions and types
    from pyspark.sql.types import IntegerType, StringType
    from pyspark.sql.functions import floor, round
    # import modules using an alias
    import pyspark.sql.types as T
    import pyspark.sql.functions as F
    

    有关数据类型的完整列表,请参阅 Spark 数据类型

    有关 PySpark SQL 函数的完整列表,请参阅 Spark 函数

    创建 DataFrame

    可通过多种方法来创建 DataFrame。 通常,需要根据数据源(例如表或文件集合)来定义 DataFrame。 然后,如 Apache Spark 基本概念部分中所述,使用 display 等操作触发要执行的转换。 display 方法可输出 DataFrame。

    创建包含指定值的 DataFrame

    若要创建包含指定值的 DataFrame,请使用 createDataFrame 方法,其中行以元组列表的形式表示:

    df_children = spark.createDataFrame(
      data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
      schema = ['name', 'age'])
    display(df_children)
    

    请注意,在输出中,df_children 列的数据类型是自动推断的。 也可以通过添加架构来指定类型。 架构是使用由 StructType 组成的 StructFields 定义的,其中指定了名称、数据类型和指示它们是否包含 null 值的布尔标志。 必须从 pyspark.sql.types 中导入数据类型。

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    df_children_with_schema = spark.createDataFrame(
      data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
      schema = StructType([
        StructField('name', StringType(), True),
        StructField('age', IntegerType(), True)
    display(df_children_with_schema)
    

    根据 Unity Catalog 中的表创建 DataFrame

    若要根据 Unity Catalog 中的表创建 DataFrame,请使用 table 方法,并通过格式 <catalog-name>.<schema-name>.<table-name> 来标识表。 单击左侧导航栏上的“目录”,使用“目录资源管理器”导航到表。 单击它,然后选择“复制表路径”,将表路径插入笔记本中

    以下示例会加载表 samples.tpch.customer,但你也可以提供自己的表的路径。

    df_customer = spark.table('samples.tpch.customer')
    display(df_customer)
    

    根据上传的文件创建 DataFrame

    若要根据上传到 Unity Catalog 卷的文件创建 DataFrame,请使用 read 属性。 此方法会返回一个 DataFrameReader,然后可将其用于读取相应的格式。 单击左侧小边栏上的目录选项,并使用目录浏览器查找文件。 选择该文件,然后单击“复制卷文件路径”

    下面的示例从 *.csv 文件读取数据,但 DataFrameReader 支持上传许多其他格式的文件。 请参阅 DataFrameReader 方法

    # Assign this variable your full volume file path
    volume_file_path = ""
    df_csv = (spark.read
      .format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load(volume_file_path)
    display(df_csv)
    

    有关 Unity Catalog 卷的详细信息,请参阅什么是 Unity Catalog 卷?

    根据 JSON 响应创建 DataFrame

    若要根据 REST API 返回的 JSON 响应有效负载创建 DataFrame,请使用 Python requests 包来查询和分析响应。 必须导入包才能使用。 此示例使用来自美国食品和药物管理局药物申请数据库的数据。

    import requests
    # Download data from URL
    url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
    response = requests.get(url)
    # Create the DataFrame
    df_drugs = spark.createDataFrame(response.json()["results"])
    display(df_drugs)
    

    有关在 Databricks 上处理 JSON 等半结构化数据的信息,请参阅对半结构化数据建模

    选择 JSON 字段或对象

    若要从转换后的 JSON 中选择特定字段或对象,请使用 [] 表示法。 例如,若要选择本身就是产品数组的 products 字段,请使用以下表示法:

    display(df_drugs.select(df_drugs["products"]))
    

    还可以将方法调用链接在一起,以遍历多个字段。 例如,若要输出药物申请表中第一个产品的品牌名:

    display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
    

    根据文件创建 DataFrame

    为了演示如何根据文件创建 DataFrame,此示例将加载 /databricks-datasets 目录中的 CSV 数据。

    若要导航到示例数据集,可以使用 Databricks Utilties 文件系统命令。 以下示例使用 dbutils 列出在 /databricks-datasets 中可用的数据集:

    display(dbutils.fs.ls('/databricks-datasets'))
    

    或者,可以使用 %fs 访问 Databricks CLI 文件系统命令,如以下示例所示:

    %fs ls '/databricks-datasets'
    

    若要根据文件或文件目录创建 DataFrame,请在 load 方法中指定路径:

    df_population = (spark.read
      .format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
    display(df_population)
                  使用 DataFrame 转换数据
    

    使用 DataFrame,可以利用内置方法对数据进行排序、筛选和聚合,从而轻松转换数据。 许多转换并不作为 DataFrame 的方法指定,而是在 spark.sql.functions 包中提供。 请参阅 Databricks Spark SQL 函数

    联接数据帧

    Spark 提供了许多基本列操作:

    强制转换列类型

    若要输出 DataFrame 中的所有列,请使用 columns,例如 df_customer.columns

    可以使用 selectcol 选择特定列。 col 函数位于 pyspark.sql.functions 子模块中。

    from pyspark.sql.functions import col
    df_customer.select(
      col("c_custkey"),
      col("c_acctbal")
    

    还可以使用 expr 来引用列,它可以获取定义为字符串的表达式:

    from pyspark.sql.functions import expr
    df_customer.select(
      expr("c_custkey"),
      expr("c_acctbal")
    

    还可以使用 selectExpr,它可接受 SQL 表达式:

    df_customer.selectExpr(
      "c_custkey as key",
      "round(c_acctbal) as account_rounded"
    

    若要使用字符串字面量选择列,请执行以下操作:

    df_customer.select(
      "c_custkey",
      "c_acctbal"
    

    若要从特定 DataFrame 显式选择列,可以使用 [] 运算符或 . 运算符。 (. 运算符不能用于选择以整数开头的列或包含空格或特殊字符的列。)在联接 DataFrame 时,如果某些列具有相同的名称,这种方法尤其有用。

    df_customer.select(
      df_customer["c_custkey"],
      df_customer["c_acctbal"]
    
    df_customer.select(
      df_customer.c_custkey,
      df_customer.c_acctbal
    

    若要创建新列,请使用 withColumn 方法。 以下示例创建一个新列,该列包含一个布尔值,该值根据客户帐户余额 c_acctbal 是否超过 1000 来确定:

    df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
    

    若要重命名列,请使用 withColumnRenamed 方法,该方法可接受现有列名称和新列名称:

    df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
    

    如果要在聚合过程中同时重命名列,alias 方法特别有用:

    from pyspark.sql.functions import avg
    df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
        avg(df_customer["c_acctbal"]).alias("avg_account_balance")
    display(df_segment_balance)
    

    强制转换列类型

    在某些情况下,可能需要更改 DataFrame 中一列或多列的数据类型。 为此,请使用 cast 方法在列数据类型之间进行转换。 以下示例演示如何使用 col 方法引用列,将列从整数转换为字符串类型:

    from pyspark.sql.functions import col
    df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
    print(type(df_casted))
    

    若要删除列,可以在选择时忽略这些列或使用 select(*) except,也可以使用 drop 方法:

    df_customer_flag_renamed.drop("balance_flag_renamed")
    

    还可以一次性删除多列:

    df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
    

    Spark 提供了许多基本行操作:

    删除重复行 处理 null 值 对行进行排序

    若要筛选行,请对 DataFrame 使用 filterwhere 方法,以便仅返回特定行。 若要标识要筛选的列,请使用 col 方法或计算结果为列的表达式。

    from pyspark.sql.functions import col
    df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
    

    若要根据多个条件进行筛选,请使用逻辑运算符。 例如,&| 可分别用于表示 ANDOR 条件。 以下示例筛选 c_nationkey 等于 20c_acctbal 大于 1000 的行。

    df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
    
    df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
                  删除重复行
    

    若要去除重复行,请使用 distinct,它仅返回非重复行。

    df_unique = df_customer.distinct()
                  处理 null 值
    

    若要处理 null 值,请使用 na.drop 方法删除包含 null 值的行。 使用此方法,可以指定是要删除包含 any null 值的行,还是要删除包含 all null 值的行。

    若要删除任何 null 值,请使用以下示例之一。

    df_customer_no_nulls = df_customer.na.drop()
    df_customer_no_nulls = df_customer.na.drop("any")
    

    如果只想筛除全部为 null 值的行,请使用以下方法:

    df_customer_no_nulls = df_customer.na.drop("all")
    

    通过指定以下命令,可以对列的子集应用此操作,如下所示:

    df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
    

    若要填写缺失值,请使用 fill 方法。 可以选择将此方法应用于所有列或列的子集。 在下面的示例中,帐户余额 c_acctbal 为 null 值的帐户余额将填入 0

    df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
    

    若要将字符串替换为其他值,请使用 replace 方法。 在下面的示例中,任何空地址字符串都将替换为 UNKNOWN 一词:

    df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
    

    若要追加行,需要使用 union 方法创建新的 DataFrame。 在以下示例中,将之前创建的 DataFrame df_that_one_customerdf_filtered_customer 组合在一起,它将返回一个包含三个客户的 DataFrame:

    df_appended_rows = df_that_one_customer.union(df_filtered_customer)
    display(df_appended_rows)
    

    还可以将 DataFrame 写入表,然后追加新行,从而将其组合在一起。 对于生产工作负载,随着数据规模的增长,将数据源递增地处理到目标表可以大幅降低延迟和计算成本。 请参阅 Lakeflow Connect 中的标准连接器

    对行进行排序

    对大量数据进行排序的成本可能很高,而且如果存储已排序的数据并使用 Spark 重新加载数据,则不能保证顺序。 请确保你在使用排序时是有明确意图的。

    若要按一列或多列对行进行排序,请使用 sortorderBy 方法。 默认情况下,这些方法按升序排序:

    df_customer.orderBy(col("c_acctbal"))
    

    若要按降序筛选,请使用 desc

    df_customer.sort(col("c_custkey").desc())
    

    以下示例演示如何对两列进行排序:

    df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
    df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
    

    若要限制在 DataFrame 排序后要返回的行数,请使用 limit 方法。 以下示例仅显示前 10 个结果:

    display(df_sorted.limit(10))
                  联接数据帧
    

    若要联接两个或多个 DataFrame,请使用 join 方法。 可以在 how(联接类型)和 on(基于哪些列进行联接)参数中指定联接 DataFrame 的方式。 常见的联接类型包括:

    inner:这是默认的联接类型,它返回的 DataFrame 仅保留那些在 DataFrame 中的 on 参数有匹配项的行。 left:此类型会保留第一个指定 DataFrame 的所有行,以及第二个指定 DataFrame 中与第一个 DataFrame 有匹配项的行。 outer:无论是否有匹配项,外部联接都会保留这两个 DataFrame 中的所有行。

    有关联接的详细信息,请参阅在 Azure Databricks 上使用联接。 有关 PySpark 支持的联接列表,请参阅 DataFrame 联接

    以下示例返回一个 DataFrame,其中 orders DataFrame 的每一行都与 customers DataFrame 中的相应行联接。 使用的是内联,因为预期每个订单都对应于一个客户。

    df_customer = spark.table('samples.tpch.customer')
    df_order = spark.table('samples.tpch.orders')
    df_joined = df_order.join(
      df_customer,
      on = df_order["o_custkey"] == df_customer["c_custkey"],
      how = "inner"
    display(df_joined)
    

    若要基于多个条件进行联接,请使用布尔运算符(例如 &|)分别指定 ANDOR。 以下示例添加了一个附加条件,仅筛选 o_totalprice 大于 500,000 的行:

    df_customer = spark.table('samples.tpch.customer')
    df_order = spark.table('samples.tpch.orders')
    df_complex_joined = df_order.join(
      df_customer,
      on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
      how = "inner"
    display(df_complex_joined)
    

    若要聚合 DataFrame 中的数据(类似于 SQL 中的 GROUP BY),请使用 groupBy 方法指定要作为分组依据的列,并使用 agg 方法指定聚合。 从 avg 中导入常见聚合,包括 summaxminpyspark.sql.functions。 以下示例显示了按市场细分划分的平均客户余额:

    from pyspark.sql.functions import avg
    # group by one column
    df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
        avg(df_customer["c_acctbal"])
    display(df_segment_balance)
    
    from pyspark.sql.functions import avg
    # group by two columns
    df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
        avg(df_customer["c_acctbal"])
    display(df_segment_nation_balance)
    

    某些聚合是操作,这意味着它们会触发计算。 在这种情况下,就无需使用其他操作来输出结果。

    若要对 DataFrame 中的行进行计数,请使用 count 方法:

    df_customer.count()
    

    转换 DataFrame 的方法会返回 DataFrame,在调用操作之前,Spark 不会对转换执行操作。 这种延迟计算意味着你可以将多个方法链接起来,既方便又易读。 以下示例演示了如何链接筛选、聚合和排序:

    from pyspark.sql.functions import count
    df_chained = (
        df_order.filter(col("o_orderstatus") == "F")
        .groupBy(col("o_orderpriority"))
        .agg(count(col("o_orderkey")).alias("n_orders"))
        .sort(col("n_orders").desc())
    display(df_chained)
    

    可视化 DataFrame

    若要在笔记本中可视化 DataFrame,请单击 DataFrame 左上角的表旁边的 + 符号,然后选择“可视化”,根据 DataFrame 添加一个或多个图表。 有关可视化效果的详细信息,请参阅 Databricks 笔记本和 SQL 编辑器中的可视化效果

    display(df_order)
    

    若要执行其他可视化,Databricks 建议使用适用于 Spark 的 Pandas API。 使用 .pandas_api(),可以强制转换为 Spark DataFrame 对应的 Pandas API。 有关详细信息,请参阅 Spark 上的 Pandas API

    转换数据后,可以使用 DataFrameWriter 方法保存数据。 可以在 DataFrameWriter 中找到这些方法的完整列表。 以下部分演示了如何将 DataFrame 另存为表和数据文件集合。

    将 DataFrame 另存为表

    若要将 DataFrame 另存为 Unity Catalog 中的表,请使用 write.saveAsTable 方法,并按照 <catalog-name>.<schema-name>.<table-name> 格式指定路径。

    df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
    

    将 DataFrame 写入为 CSV

    若要将 DataFrame 写入为 *.csv 格式,请使用 write.csv 方法,并指定格式和选项。 默认情况下,如果指定路径存在数据,写入操作将失败。 可以指定以下模式之一以执行不同的操作:

    overwrite 使用 DataFrame 内容覆盖目标路径中的所有现有数据。 append 将 DataFrame 的内容追加到目标路径中的数据。 ignore 在目标路径中存在数据时将写入操作静默失败。

    以下示例演示了如何使用 DataFrame 内容覆盖数据,以 CSV 文件保存:

    # Assign this variable your file path
    file_path = ""
    (df_joined.write
      .format("csv")
      .mode("overwrite")
      .write(file_path)
    

    若要在 Databricks 上利用更多 Spark 功能,请参阅:

    可视化效果 使用作业实现自动化 IDE 和 SDK UDF(用户定义的函数) 机器学习库 (MLlib)