本教程介绍如何将 Azure Databricks 群集连接到启用了 Azure Data Lake Storage Gen2 的 Azure 存储帐户中存储的数据。 建立此连接后,即可在群集本机上针对数据运行查询和分析。

在本教程中,将:

  • 将非结构化数据引入存储帐户中
  • 对 Blob 存储中的数据运行分析
  • 如果没有 Azure 订阅,请在开始之前创建一个 免费帐户

  • 创建一个采用分层命名空间的存储帐户 (Azure Data Lake Storage Gen2)

    请参阅 创建用于 Azure Data Lake Storage Gen2 的存储帐户

  • 请确保你的用户帐户分配有 存储 Blob 数据参与者角色

  • 安装 AzCopy v10。 请参阅 使用 AzCopy v10 传输数据

  • 创建服务主体,创建客户端密码,然后向服务主体授予对存储帐户的访问权限。

    请参阅 教程:连接到 Azure Data Lake Storage Gen2 (步骤 1 到 3)。 完成这些步骤后,请确保将租户 ID、应用 ID 和客户端密码值粘贴到文本文件中。 很快就会需要这些值。

    下载航班数据

    本教程使用美国运输统计局的航班数据,演示如何执行 ETL 操作。 必须下载该数据才能完成本教程。

  • 下载 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 文件。 该文件包含航班数据。

  • 将压缩文件的内容解压缩,并记下文件名和文件路径。 在稍后的步骤中需要使用此信息。

    将源数据复制到存储帐户中

    使用 AzCopy 将 .csv 文件中的数据复制到 Data Lake Storage Gen2 帐户。

  • 打开命令提示符窗口,输入以下命令登录到存储帐户。

    azcopy login
    

    按照命令提示符窗口中的说明对用户帐户进行身份验证。

  • 若要复制 .csv 帐户中的数据,请输入以下命令。

    azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/folder1/On_Time.csv
    
  • <csv-folder-path> 占位符值替换为 .csv 文件的路径 。

  • <storage-account-name> 占位符值替换为存储帐户的名称。

  • <container-name> 占位符替换为存储帐户中容器的名称。

  • 将以下代码块复制并粘贴到第一个单元格中,但目前请勿运行此代码。

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  • 在此代码块中,请将 appIdclientSecrettenantstorage-account-name 占位符值替换为在完成本教程的先决条件时收集的值。 将 container-name 占位符值替换为容器的名称。

  • SHIFT + ENTER 键,运行此块中的代码。

    请将此笔记本保持打开状态,因为稍后要在其中添加命令。

    使用 Databricks Notebook 将 CSV 转换为 Parquet

    在前面创建的笔记本中添加一个新的单元,并将以下代码粘贴到该单元中。

    # Use the previously established DBFS mount point to read the data.
    # create a data frame to read data.
    flightDF = spark.read.format('csv').options(
        header='true', inferschema='true').load("/mnt/flightdata/*.csv")
    # read the airline csv file and write the output to parquet format for easy query.
    flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
    print("Done")
    

    在新单元中粘贴以下代码,以获取通过 AzCopy 上传的 CSV 文件列表。

    import os.path
    import IPython
    from pyspark.sql import SQLContext
    display(dbutils.fs.ls("/mnt/flightdata"))
    

    若要创建新文件并列出 parquet/flights 文件夹中的文件,请运行以下脚本:

    dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
    dbutils.fs.ls("/mnt/flightdata/parquet/flights")
    

    通过这些代码示例,你已使用启用了 Data Lake Storage Gen2 的存储帐户中存储的数据探讨了 HDFS 的层次结构性质。

    接下来,可以开始查询上传到存储帐户中的数据。 将下面的每个代码块输入到 Cmd 1 中,然后按 Cmd + Enter 运行 Python 脚本。

    若要为数据源创建数据帧,请运行以下脚本:

  • <csv-folder-path> 占位符值替换为 .csv 文件的路径 。
  • # Copy this into a Cmd cell in your notebook.
    acDF = spark.read.format('csv').options(
        header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
    acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')
    # read the existing parquet file for the flights database that was created earlier
    flightDF = spark.read.format('parquet').options(
        header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")
    # print the schema of the dataframes
    acDF.printSchema()
    flightDF.printSchema()
    # print the flight database size
    print("Number of flights in the database: ", flightDF.count())
    # show the first 20 rows (20 is the default)
    # to show the first n rows, run: df.show(n)
    acDF.show(100, False)
    flightDF.show(20, False)
    # Display to run visualizations
    # preferably run this in a separate cmd cell
    display(flightDF)
    

    输入此脚本,以针对数据运行一些基本分析查询。

    # Run each of these queries, preferably in a separate cmd cell for separate analysis
    # create a temporary sql view for querying flight information
    FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
    FlightTable.createOrReplaceTempView('FlightTable')
    # create a temporary sql view for querying airline code information
    AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
    AirlineCodes.createOrReplaceTempView('AirlineCodes')
    # using spark sql, query the parquet file to return total flights in January and February 2016
    out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
    NumJan2016Flights = out1.count()
    out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
    NumFeb2016Flights = out2.count()
    print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
    Total = NumJan2016Flights+NumFeb2016Flights
    print("Total flights combined: ", Total)
    # List out all the airports in Texas
    out = spark.sql(
        "SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
    print('Airports in Texas: ', out.show(100))
    # find all airlines that fly from Texas
    out1 = spark.sql(
        "SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
    print('Airlines that fly to/from Texas: ', out1.show(100, False))
    

    如果不再需要本文中创建的资源,可以删除资源组和所有相关资源。 为此,请选择存储帐户所在的资源组,然后选择“删除”。

    使用 Apache Hive on Azure HDInsight 提取、转换和加载数据

  •