本文适用于 Azure Synapse Link for Azure Cosmos DB 中的 Parquet 文件和容器。 可以使用 Spark 或 SQL 来读取或转换包含复杂架构(例如数组或嵌套结构)的数据。 以下示例是使用单个文档完成的,但可以使用 Spark 或 SQL 将文档数轻松扩展到数十亿个。 本文中包含的代码使用 PySpark (Python)。

复杂的数据类型越来越常见,为数据工程师带来了挑战。 分析嵌套的架构和数组可能涉及耗时且复杂的 SQL 查询。 此外,可能很难重命名或强制转换嵌套列数据类型。 而且,在使用深度嵌套的对象时,可能会遇到性能问题。

数据工程师需要了解如何有效地处理复杂的数据类型,使每个人都可以轻松地对其进行访问。 在下面的示例中,你将在 Azure Synapse Analytics 中使用 Spark,通过数据帧读取对象并将其转换为平面结构。 使用 Azure Synapse Analytics 中的 SQL 无服务器模型直接查询此类对象,并以常规表的形式返回这些结果。

什么是数组和嵌套结构?

以下对象来自 Application Insights 。 在此对象中,有嵌套结构和包含嵌套结构的数组。

"id": "66532691-ab20-11ea-8b1d-936b3ec64e54", "context": { "data": { "eventTime": "2020-06-10T13:43:34.553Z", "samplingRate": "100.0", "isSynthetic": "false" "session": { "isFirst": "false", "id": "38619c14-7a23-4687-8268-95862c5326b1" "custom": { "dimensions": [ "customerInfo": { "ProfileType": "ExpertUser", "RoomName": "", "CustomerName": "diamond", "UserName": "XXXX@yahoo.com" "customerInfo": { "ProfileType": "Novice", "RoomName": "", "CustomerName": "topaz", "UserName": "XXXX@outlook.com"

包含数组和嵌套结构的架构的示例

通过命令 df.printschema 输出对象的数据帧(简称 df )的架构时,会看到以下表示形式:

  • 黄色表示嵌套结构。
  • 绿色表示具有两个元素的数组。
  • 已将 _rid _ts _etag 添加到系统,因为已将文档引入到 Azure Cosmos DB 的事务性存储中。

    前面的数据帧只针对 5 个列和 1 个行计数。 转换后,策展数据帧将有 13 个列和 2 个行(表格格式)。

    平展嵌套结构和分解数组

    借助 Azure Synapse Analytics 中的 Spark,可以轻松地将嵌套结构转换为列,将数组元素转换为多个行。 使用以下步骤进行实施。

    定义用于平展嵌套架构的函数

    无需更改即可使用此函数。 使用以下函数在 PySpark 笔记本 中创建单元格:

    from pyspark.sql.functions import col
    def flatten_df(nested_df):
        stack = [((), nested_df)]
        columns = []
        while len(stack) > 0:
            parents, df = stack.pop()
            flat_cols = [
                col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
                for c in df.dtypes
                if c[1][:6] != "struct"
            nested_cols = [
                for c in df.dtypes
                if c[1][:6] == "struct"
            columns.extend(flat_cols)
            for nested_col in nested_cols:
                projected_df = df.select(nested_col + ".*")
                stack.append((parents + (nested_col,), projected_df))
        return nested_df.select(columns)
    

    使用可平展嵌套架构的函数

    在此步骤中,你将数据帧 (df) 的嵌套架构平展成新的数据帧 (df_flat):

    from pyspark.sql.types import StringType, StructField, StructType
    df_flat = flatten_df(df)
    display(df_flat.limit(10))
    

    display 函数应返回 10 个列和 1 个行。 数组及其嵌套元素仍然存在。

    这里,你将数据帧 df_flat 中的数组 context_custom_dimensions 转换成新数据帧 df_flat_explode。 在下面的代码中,你还定义要选择哪个列:

    from pyspark.sql.functions import explode
    from pyspark.sql.functions import flatten
    from pyspark.sql.functions import arrays_zip
    df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
    .select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
    display(df_flat_explode.limit(10))
    

    display 函数应返回 10 个列和 2 个行。 下一步是将嵌套架构与步骤 1 中定义的函数平展在一起。

    使用可平展嵌套架构的函数

    最后,使用函数将数据帧 df_flat_explode 的嵌套架构平展成新数据帧 df_flat_explode_flat

    df_flat_explode_flat = flatten_df(df_flat_explode)
    display(df_flat_explode_flat.limit(10))
    

    display 函数应显示 13 个列和 2 个行。

    数据帧 df_flat_explode_flat 的函数 printSchema 返回以下结果:

    直接读取数组和嵌套结构

    使用 SQL 的无服务器模型,你可以查询和创建基于此类对象的视图和表。

    首先,用户应根据数据的存储方式使用以下分类。 以大写字母显示的所有内容都是特定于你的用例的:

    'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' 'Parquet' (ADLSg2) N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' 'CosmosDB' (Azure Synapse Link)

    替换每个字段,如下所示:

  • 'YOUR BULK ABOVE' 是你连接到的数据源的连接字符串。
  • 'YOUR TYPE ABOVE' 是用于连接到源的格式。
  • select *
    openrowset(
        BULK 'YOUR BULK ABOVE',
        FORMAT='YOUR TYPE ABOVE'
    with (id varchar(50),
            contextdataeventTime varchar(50) '$.context.data.eventTime',
            contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
            contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
            contextsessionisFirst varchar(50) '$.context.session.isFirst',
            contextsessionid varchar(50) '$.context.session.id',
            contextcustomdimensions varchar(max) '$.context.custom.dimensions'
    ) as q 
    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
                RoomName varchar(50) '$.customerInfo.RoomName',
                CustomerName varchar(50) '$.customerInfo.CustomerName',
                UserName varchar(50) '$.customerInfo.UserName'
    

    有两种不同的操作类型:

  • 下面的代码行中指示了第一种操作类型,该行定义名为 contextdataeventTime 的引用嵌套元素 Context.Data.eventTime 的列。

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    该行定义名为 contextdataeventTime 的引用嵌套元素 Context>Data>eventTime 的列。

  • 第二种操作类型使用 cross apply 为数组下的每个元素创建新行。 然后,它定义每个嵌套对象。

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    如果数组有 5 个具有 4 个嵌套结构的元素,则 SQL 的无服务器模型会返回 5 个行和 4 个列。 SQL 的无服务器模型可以就地查询,在 2 个行中映射数组,并将所有嵌套结构显示成列。

  • 了解如何通过 Spark 3 查询 Synapse Link for Azure Cosmos DB
  • 了解如何通过 Spark 2 查询 Synapse Link for Azure Cosmos DB
  • 查询 Parquet 嵌套类型
  •