本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API、Apache Spark Scala 数据帧 API 和 SparkR SparkDataFrame API 加载和转换数据。

本教程结束时,你可了解数据帧是什么并熟悉以下任务:

Python

  • 定义变量并将公共数据复制到 Unity Catalog 卷
  • 使用 Python 创建数据帧
  • 将数据从 CSV 文件加载到数据帧
  • 查看数据帧并与之交互
  • 保存数据帧
  • 在 PySpark 中运行 SQL 查询
  • 另请参阅 Apache Spark PySpark API 参考

    Scala

  • 定义变量并将公共数据复制到 Unity Catalog 卷
  • 使用 Scala 创建数据帧
  • 将数据从 CSV 文件加载到数据帧
  • 查看数据帧并与之交互
  • 保存数据帧
  • 在 Apache Spark 中运行 SQL 查询
  • 另请参阅 Apache Spark Scala API 参考

  • 定义变量并将公共数据复制到 Unity Catalog 卷
  • 创建 SparkR SparkDataFrame
  • 将数据从 CSV 文件加载到数据帧
  • 查看数据帧并与之交互
  • 保存数据帧
  • 在 SparkR 中运行 SQL 查询
  • 另请参阅 Apache SparkR API 参考

    什么是数据帧?

    数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

    Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

    若要完成以下教程,必须满足以下要求:

  • 若要使用本教程中的示例,必须已为工作区 启用 Unity 目录

  • 本教程中的示例使用 Unity Catalog 来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。

  • 必须在 Unity 目录中具有以下权限:

  • READ VOLUME WRITE VOLUME ALL PRIVILEGES 表示本教程使用的卷。
  • USE SCHEMA ALL PRIVILEGES 表示本教程使用的架构。
  • USE CATALOG ALL PRIVILEGES 表示本教程使用的目录。
  • 若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象

    有关本文的完整笔记本,请参阅 数据帧教程笔记本

    步骤 1:定义变量并加载 CSV 文件

    此步骤定义要在本教程中使用的变量,然后将包含婴儿姓名数据的 CSV 文件从 health.data.ny.gov 加载到 Unity Catalog 卷。

  • 单击 新建图标 图标打开新笔记本。 若要了解如何在 Azure Databricks 笔记本中导航,请参阅 Databricks 笔记本界面和控件

  • 将以下代码复制并粘贴到新的空笔记本单元格中: 将 <catalog-name> <schema-name> <volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。 请将 <table_name> 替换为你选择的表名称。 本教程稍后会将婴儿姓名数据加载到此表中。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    
    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  • Shift+Enter 以运行单元格并创建新的空白单元格。

  • 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用 Databricks dbutuils 命令将 rows.csv 文件从 health.data.ny.gov 复制到 Unity Catalog 卷。

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    
    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    步骤 2:创建数据帧

    此步骤使用测试数据创建名为 df1 的数据帧,然后显示其内容。

  • 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用测试数据创建数据帧,然后显示数据帧的内容和架构。

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    步骤 3:将数据从 CSV 文件加载到数据帧

    此步骤从之前加载到 Unity Catalog 卷的 CSV 文件中创建名为 df_csv 的数据帧。 请参阅 spark.read.csv

  • 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码将婴儿姓名数据从 CSV 文件加载到数据帧 df_csv,然后显示该数据帧的内容。

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    display(dfCsv)
    
    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    display(df_csv)
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    可以从许多受支持的文件格式加载数据。

    步骤 4:查看数据帧并与之交互

    使用以下方法查看婴儿姓名数据帧并与之交互。

    了解如何显示 Apache Spark 数据帧的架构。 Apache Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。

    Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 .printSchema() 方法显示数据帧的架构,以便查看两个数据帧的架构 - 准备合并这两个数据帧。

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    
    printSchema(df_csv)
    printSchema(df1)
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    重命名数据帧中的列

    了解如何重命名数据帧中的列。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于重命名 df1_csv 数据帧中的列,以匹配 df1 数据帧中的相应列。 此代码使用 Apache Spark withColumnRenamed() 方法。

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    
    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    合并数据帧

    了解如何创建一个新的数据帧,用于将某个数据帧的行添加到另一个数据帧。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark union() 方法将第一个数据帧 df 的内容与数据帧 df_csv 合并,后者包含从 CSV 文件加载的婴儿姓名数据。

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    
    display(df <- union(df1, df_csv))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    筛选数据帧中的行

    使用 Apache Spark .filter().where() 方法筛选行,发现数据集中最受欢迎的婴儿姓名。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。

    使用 .filter() 方法

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .filter() 方法显示数据帧中计数超过 50 的行。

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    display(filteredDF <- filter(df, df$Count > 50))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    使用 .where() 方法

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .where() 方法显示数据帧中计数超过 50 的行。

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    display(filtered_df <- where(df, df$Count > 50))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    从数据帧中选择列并按频率排序

    使用 select() 方法了解婴儿姓名的使用频率,以指定要返回的数据帧中的列。 使用 Apache Spark orderbydesc 函数对结果进行排序。

    Apache Spark 的 pyspark.sql 模块为 SQL 函数提供支持。 在这些函数中,本教程中使用的函数包括 Apache Spark orderBy()desc()expr() 函数。 可以根据需要将它们导入会话来使用这些函数。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 desc() 函数,然后使用 Apache Spark select() 方法以及 Apache Spark orderBy()desc() 函数按降序显示最常用的姓名及其计数。

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    创建子集数据帧

    了解如何从现有数据帧创建子集数据帧。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark filter 方法创建新的数据帧,以按年份、计数和性别限制数据。 它使用 Apache Spark select() 方法来限制列。 它还使用 Apache Spark orderBy()desc() 函数按计数对新的数据帧进行排序。

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    
    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    步骤 5:保存数据帧

    了解如何保存数据帧。 可以将数据帧保存到表,或者将数据帧写入一个或多个文件。

    将数据帧保存到表

    默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE 表权限。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用在本教程开始时定义的变量将数据帧的内容保存到表中。

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    
    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    大多数 Apache Spark 应用程序都以分布式方式处理大型数据集。 Apache Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。

    将数据帧保存到 JSON 文件

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于将数据帧保存到 JSON 文件的目录中。

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    
    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    从 JSON 文件读取数据帧

    了解如何使用 Apache Spark spark.read.format() 方法将 JSON 数据从目录读取到数据帧中。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码显示在上一示例中保存的 JSON 文件。

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    
    display(read.json("/tmp/json_data"))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    其他任务:在 PySpark、Scala 和 R 中运行 SQL 查询

    Apache Spark 数据帧提供以下选项,用于将 SQL 与 PySpark、Scala 和 R 合并在一起。可以在为本教程创建的同一笔记本中运行以下代码。

    将列指定为 SQL 查询

    了解如何使用 Apache Spark selectExpr() 方法。 这是 select() 方法的变体,它用于接受 SQL 表达式并返回更新的数据帧。 此方法允许使用 SQL 表达式,例如 upper

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark selectExpr() 方法和 SQL upper 表达式将字符串列转换为大写(并重命名列)。

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    
    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    使用 expr() 对列使用 SQL 语法

    了解如何导入并使用 Apache Spark expr() 函数,以在指定列的任何位置使用 SQL 语法。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 expr() 函数,然后使用 Apache Spark expr() 函数和 SQL lower 表达式将字符串列转换为小写(并重命名列)。

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    
    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    使用 spark.sql() 函数运行任意 SQL 查询

    了解如何使用 Apache Spark spark.sql() 函数运行任意 SQL 查询。

  • 将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark spark.sql() 函数通过 SQL 语法来查询 SQL 表。

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    
    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  • Shift+Enter 以运行单元格,然后移动到下一个单元格。

    数据帧教程笔记本

    以下笔记本包含本教程中的示例查询。

    Python

    使用 Python 的数据帧教程

    获取笔记本

    Scala

    使用 Scala 的数据帧教程

    获取笔记本

    使用 R 的数据帧教程

    获取笔记本

  • Azure Databricks 上的 PySpark
  • Apache Spark API 参考
  • 在 PySpark 与 Pandas 数据帧之间进行转换
  • Spark 上的 Pandas API
  •