本教程介绍如何将 Azure Databricks 群集连接到启用了 Azure Data Lake Storage Gen2 的 Azure 存储帐户中存储的数据。 建立此连接后,即可在群集本机上针对数据运行查询和分析。
在本教程中,将:
将非结构化数据引入存储帐户中
对 Blob 存储中的数据运行分析
如果没有 Azure 订阅,请在开始之前创建一个
免费帐户
。
创建一个采用分层命名空间的存储帐户 (Azure Data Lake Storage Gen2)
请参阅
创建用于 Azure Data Lake Storage Gen2 的存储帐户
。
请确保你的用户帐户分配有
存储 Blob 数据参与者角色
。
安装 AzCopy v10。 请参阅
使用 AzCopy v10 传输数据
。
创建服务主体,创建客户端密码,然后向服务主体授予对存储帐户的访问权限。
请参阅
教程:连接到 Azure Data Lake Storage Gen2
(步骤 1 到 3)。 完成这些步骤后,请确保将租户 ID、应用 ID 和客户端密码值粘贴到文本文件中。 很快就会需要这些值。
下载航班数据
本教程使用美国运输统计局的航班数据,演示如何执行 ETL 操作。 必须下载该数据才能完成本教程。
下载
On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip
文件。 该文件包含航班数据。
将压缩文件的内容解压缩,并记下文件名和文件路径。 在稍后的步骤中需要使用此信息。
将源数据复制到存储帐户中
使用 AzCopy 将
.csv
文件中的数据复制到 Data Lake Storage Gen2 帐户。
打开命令提示符窗口,输入以下命令登录到存储帐户。
azcopy login
按照命令提示符窗口中的说明对用户帐户进行身份验证。
若要复制 .csv 帐户中的数据,请输入以下命令。
azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/folder1/On_Time.csv
将 <csv-folder-path>
占位符值替换为 .csv 文件的路径 。
将 <storage-account-name>
占位符值替换为存储帐户的名称。
将 <container-name>
占位符替换为存储帐户中容器的名称。
将以下代码块复制并粘贴到第一个单元格中,但目前请勿运行此代码。
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<appId>",
"fs.azure.account.oauth2.client.secret": "<clientSecret>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
"fs.azure.createRemoteFileSystemDuringInitialization": "true"}
dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1",
mount_point = "/mnt/flightdata",
extra_configs = configs)
在此代码块中,请将 appId
、clientSecret
、tenant
和 storage-account-name
占位符值替换为在完成本教程的先决条件时收集的值。 将 container-name
占位符值替换为容器的名称。
按 SHIFT + ENTER 键,运行此块中的代码。
请将此笔记本保持打开状态,因为稍后要在其中添加命令。
使用 Databricks Notebook 将 CSV 转换为 Parquet
在前面创建的笔记本中添加一个新的单元,并将以下代码粘贴到该单元中。
# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.
flightDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
在新单元中粘贴以下代码,以获取通过 AzCopy 上传的 CSV 文件列表。
import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))
若要创建新文件并列出 parquet/flights 文件夹中的文件,请运行以下脚本:
dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")
通过这些代码示例,你已使用启用了 Data Lake Storage Gen2 的存储帐户中存储的数据探讨了 HDFS 的层次结构性质。
接下来,可以开始查询上传到存储帐户中的数据。 将下面的每个代码块输入到 Cmd 1 中,然后按 Cmd + Enter 运行 Python 脚本。
若要为数据源创建数据帧,请运行以下脚本:
将 <csv-folder-path>
占位符值替换为 .csv 文件的路径 。
# Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')
# read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(
header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")
# print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()
# print the flight database size
print("Number of flights in the database: ", flightDF.count())
# show the first 20 rows (20 is the default)
# to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)
# Display to run visualizations
# preferably run this in a separate cmd cell
display(flightDF)
输入此脚本,以针对数据运行一些基本分析查询。
# Run each of these queries, preferably in a separate cmd cell for separate analysis
# create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')
# create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')
# using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)
# List out all the airports in Texas
out = spark.sql(
"SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
print('Airports in Texas: ', out.show(100))
# find all airlines that fly from Texas
out1 = spark.sql(
"SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))
如果不再需要本文中创建的资源,可以删除资源组和所有相关资源。 为此,请选择存储帐户所在的资源组,然后选择“删除”。
使用 Apache Hive on Azure HDInsight 提取、转换和加载数据