本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API、Apache Spark Scala 数据帧 API 和 SparkR SparkDataFrame API 加载和转换数据。
本教程结束时,你可了解数据帧是什么并熟悉以下任务:
Python
定义变量并将公共数据复制到 Unity Catalog 卷
使用 Python 创建数据帧
将数据从 CSV 文件加载到数据帧
查看数据帧并与之交互
保存数据帧
在 PySpark 中运行 SQL 查询
另请参阅
Apache Spark PySpark API 参考
。
Scala
定义变量并将公共数据复制到 Unity Catalog 卷
使用 Scala 创建数据帧
将数据从 CSV 文件加载到数据帧
查看数据帧并与之交互
保存数据帧
在 Apache Spark 中运行 SQL 查询
另请参阅
Apache Spark Scala API 参考
。
定义变量并将公共数据复制到 Unity Catalog 卷
创建 SparkR SparkDataFrame
将数据从 CSV 文件加载到数据帧
查看数据帧并与之交互
保存数据帧
在 SparkR 中运行 SQL 查询
另请参阅
Apache SparkR API 参考
。
数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。
Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
若要完成以下教程,必须满足以下要求:
若要使用本教程中的示例,必须已为工作区
启用 Unity 目录
。
本教程中的示例使用 Unity Catalog
卷
来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。
必须在 Unity 目录中具有以下权限:
READ VOLUME
和
WRITE VOLUME
或
ALL PRIVILEGES
表示本教程使用的卷。
USE SCHEMA
或
ALL PRIVILEGES
表示本教程使用的架构。
USE CATALOG
或
ALL PRIVILEGES
表示本教程使用的目录。
若要设置这些权限,请联系 Databricks 管理员或参阅
Unity Catalog 特权和安全对象
。
有关本文的完整笔记本,请参阅
数据帧教程笔记本
。
此步骤定义要在本教程中使用的变量,然后将包含婴儿姓名数据的 CSV 文件从
health.data.ny.gov
加载到 Unity Catalog 卷。
单击
图标打开新笔记本。 若要了解如何在 Azure Databricks 笔记本中导航,请参阅
Databricks 笔记本界面和控件
。
将以下代码复制并粘贴到新的空笔记本单元格中: 将
<catalog-name>
、
<schema-name>
和
<volume-name>
替换为 Unity 目录卷的目录、架构和卷名称。 请将
<table_name>
替换为你选择的表名称。 本教程稍后会将婴儿姓名数据加载到此表中。
Python
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "rows.csv"
table_name = "<table_name>"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>"
val schema = "<schema_name>"
val volume = "<volume_name>"
val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
val fileName = "rows.csv"
val tableName = "<table_name>"
val pathVolume = s"/Volumes/$catalog/$schema/$volume"
val pathTable = s"$catalog.$schema"
print(pathVolume) // Show the complete path
print(pathTable) // Show the complete path
catalog <- "<catalog_name>"
schema <- "<schema_name>"
volume <- "<volume_name>"
download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name <- "rows.csv"
table_name <- "<table_name>"
path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
path_table <- paste(catalog, ".", schema, sep = "")
print(path_volume) # Show the complete path
print(path_table) # Show the complete path
按 Shift+Enter
以运行单元格并创建新的空白单元格。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用 Databricks dbutuils 命令将 rows.csv
文件从 health.data.ny.gov 复制到 Unity Catalog 卷。
Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
此步骤使用测试数据创建名为 df1
的数据帧,然后显示其内容。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用测试数据创建数据帧,然后显示数据帧的内容和架构。
Python
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]
df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42))
val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
val df1 = data.toDF(columns: _*)
display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
// df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster.
library(SparkR)
data <- data.frame(
Year = as.integer(c(2021)),
First_Name = c("test"),
County = c("Albany"),
Sex = c("M"),
Count = as.integer(c(42))
df1 <- createDataFrame(data)
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
此步骤从之前加载到 Unity Catalog 卷的 CSV 文件中创建名为 df_csv
的数据帧。 请参阅 spark.read.csv。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码将婴儿姓名数据从 CSV 文件加载到数据帧 df_csv
,然后显示该数据帧的内容。
Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}",
header=True,
inferSchema=True,
sep=",")
display(df_csv)
Scala
val dfCsv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv(s"$pathVolume/$fileName")
display(dfCsv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
source="csv",
header = TRUE,
inferSchema = TRUE,
delimiter = ",")
display(df_csv)
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
可以从许多受支持的文件格式加载数据。
使用以下方法查看婴儿姓名数据帧并与之交互。
打印数据帧架构
了解如何显示 Apache Spark 数据帧的架构。 Apache Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。
Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 .printSchema()
方法显示数据帧的架构,以便查看两个数据帧的架构 - 准备合并这两个数据帧。
Python
df_csv.printSchema()
df1.printSchema()
Scala
dfCsv.printSchema()
df1.printSchema()
printSchema(df_csv)
printSchema(df1)
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
重命名数据帧中的列
了解如何重命名数据帧中的列。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于重命名 df1_csv
数据帧中的列,以匹配 df1
数据帧中的相应列。 此代码使用 Apache Spark withColumnRenamed()
方法。
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
dfCsvRenamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
合并数据帧
了解如何创建一个新的数据帧,用于将某个数据帧的行添加到另一个数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark union()
方法将第一个数据帧 df
的内容与数据帧 df_csv
合并,后者包含从 CSV 文件加载的婴儿姓名数据。
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(dfCsvRenamed)
display(df)
display(df <- union(df1, df_csv))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
筛选数据帧中的行
使用 Apache Spark .filter()
或 .where()
方法筛选行,发现数据集中最受欢迎的婴儿姓名。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。
使用 .filter() 方法
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .filter()
方法显示数据帧中计数超过 50 的行。
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
使用 .where() 方法
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .where()
方法显示数据帧中计数超过 50 的行。
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
从数据帧中选择列并按频率排序
使用 select()
方法了解婴儿姓名的使用频率,以指定要返回的数据帧中的列。 使用 Apache Spark orderby
和 desc
函数对结果进行排序。
Apache Spark 的 pyspark.sql 模块为 SQL 函数提供支持。 在这些函数中,本教程中使用的函数包括 Apache Spark orderBy()
、desc()
和 expr()
函数。 可以根据需要将它们导入会话来使用这些函数。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 desc()
函数,然后使用 Apache Spark select()
方法以及 Apache Spark orderBy()
和 desc()
函数按降序显示最常用的姓名及其计数。
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
创建子集数据帧
了解如何从现有数据帧创建子集数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark filter
方法创建新的数据帧,以按年份、计数和性别限制数据。 它使用 Apache Spark select()
方法来限制列。 它还使用 Apache Spark orderBy()
和 desc()
函数按计数对新的数据帧进行排序。
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何保存数据帧。 可以将数据帧保存到表,或者将数据帧写入一个或多个文件。
将数据帧保存到表
默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE
表权限。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用在本教程开始时定义的变量将数据帧的内容保存到表中。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
大多数 Apache Spark 应用程序都以分布式方式处理大型数据集。 Apache Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。
将数据帧保存到 JSON 文件
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于将数据帧保存到 JSON 文件的目录中。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
从 JSON 文件读取数据帧
了解如何使用 Apache Spark spark.read.format()
方法将 JSON 数据从目录读取到数据帧中。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码显示在上一示例中保存的 JSON 文件。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
Apache Spark 数据帧提供以下选项,用于将 SQL 与 PySpark、Scala 和 R 合并在一起。可以在为本教程创建的同一笔记本中运行以下代码。
将列指定为 SQL 查询
了解如何使用 Apache Spark selectExpr()
方法。 这是 select()
方法的变体,它用于接受 SQL 表达式并返回更新的数据帧。 此方法允许使用 SQL 表达式,例如 upper
。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark selectExpr()
方法和 SQL upper
表达式将字符串列转换为大写(并重命名列)。
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
使用 expr()
对列使用 SQL 语法
了解如何导入并使用 Apache Spark expr()
函数,以在指定列的任何位置使用 SQL 语法。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 expr()
函数,然后使用 Apache Spark expr()
函数和 SQL lower
表达式将字符串列转换为小写(并重命名列)。
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr}
// Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
使用 spark.sql() 函数运行任意 SQL 查询
了解如何使用 Apache Spark spark.sql()
函数运行任意 SQL 查询。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark spark.sql()
函数通过 SQL 语法来查询 SQL 表。
Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
按 Shift+Enter
以运行单元格,然后移动到下一个单元格。
以下笔记本包含本教程中的示例查询。
Python
使用 Python 的数据帧教程
获取笔记本
Scala
使用 Scala 的数据帧教程
获取笔记本
使用 R 的数据帧教程
获取笔记本
Azure Databricks 上的 PySpark
Apache Spark API 参考
在 PySpark 与 Pandas 数据帧之间进行转换
Spark 上的 Pandas API