适用于: NoSQL

若要从关系数据库迁移到 Azure Cosmos DB for NoSQL,可能需要更改数据模型以进行优化。

一种常见的转换方法是,通过将相关子项嵌入到一个 JSON 文档来反规范化数据。 本文探讨使用 Azure 数据工厂或 Azure Databricks 实现此目的的几个选项。 有关 Azure Cosmos DB 的数据建模的详细信息,请参阅 Azure Cosmos DB 中的数据建模

假设 SQL 数据库中包含以下两个表:Orders 和 OrderDetails。

我们希望在迁移期间,将此一对多关系合并到一个 JSON 文档中。 若要创建单个文档,请使用 FOR JSON 创建 T-SQL 查询:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查询的结果将包括 Orders 表中的数据:

理想情况下,你希望使用单个 Azure 数据工厂 (ADF) 复制活动来查询用作源的 SQL 数据,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。 目前,无法在一个复制活动中执行所需的 JSON 转换。 如果我们尝试将上述查询的结果复制到 Azure Cosmos DB for NoSQL 容器中,将会看到文档的字符串属性形式的 OrderDetails 字段,而不是预期的 JSON 数组。

可通过以下方式之一解决当前的这种限制:

  • 使用包含两个复制活动的 Azure 数据工厂
    1. 将 SQL 中的 JSON 格式的数据提取到位于中间 Blob 存储位置的某个文本文件
    2. 将 JSON 文本文件中的数据加载到 Azure Cosmos DB 中的某个容器。
    3. 使用 Azure Databricks 从 SQL 中读取数据并将其写入 Azure Cosmos DB - 我们将演示这两个选项。
    4. 让我们更详细地了解这些方法:

      Azure 数据工厂

      尽管我们无法将 OrderDetails 作为 JSON 数组嵌入到目标 Azure Cosmos DB 文档中,但可以使用两个独立的复制活动来解决该问题。

      复制活动 #1:SqlJsonToBlobText

      对于源数据,我们使用 SQL 查询通过 SQL Server OPENJSON 和 FOR JSON PATH 功能获取结果集,该结果集以单列的形式提供,每行包含一个 JSON 对象(表示订单):

      SELECT [value] FROM OPENJSON(
        (SELECT
          id = o.OrderID,
          o.OrderDate,
          o.FirstName,
          o.LastName,
          o.Address,
          o.City,
          o.State,
          o.PostalCode,
          o.Country,
          o.Phone,
          o.Total,
          (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
         FROM Orders o FOR JSON PATH)
      

      对于 SqlJsonToBlobText 复制活动的接收器,我们选择了“分隔文本”并将其指向 Azure Blob 存储中的特定文件夹。 此接收器包含一个动态生成的唯一文件名(例如 @concat(pipeline().RunId,'.json'))。 我们的文本文件实际上并不是“分隔的”,并且我们不希望使用逗号将其分析成单独的列。 我们还希望保留双引号 (""),并将“列分隔符”设置为制表符 ("\t") 或数据中未出现其他字符,然后将“引号字符”设置为“无引号字符”。

      复制活动 #2:BlobJsonToCosmos

      接下来,我们修改 ADF 管道:添加第二个复制活动,用于在 Azure Blob 存储中查找第一个活动创建的文本文件。 第二个复制活动将结果作为“JSON”源进行处理,将文本文件中找到的每个 JSON 行作为一个文档插入到 Azure Cosmos DB 接收器中。

      (可选)我们还将一个“删除”活动添加到了管道,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有旧文件。 现在,我们的 ADF 管道如下所示:

      触发上述管道后,会看到中间 Azure Blob 存储位置创建了一个文件,其中的每行包含一个 JSON 对象:

      我们还会看到 Orders 文档,其中适当嵌入的 OrderDetails 已插入到 Azure Cosmos DB 集合中:

      Azure Databricks

      我们还可以在 Azure Databricks 中使用 Spark,将 SQL 数据库源中的数据复制到 Azure Cosmos DB 目标,而无需在 Azure Blob 存储中创建中间文本/JSON 文件。

      为简单清晰起见,代码片段显式包含了虚拟数据库密码,但在理想情况下,你应使用 Azure Databricks 机密。

      首先,创建所需的 SQL 连接器 并将 Azure Cosmos DB 连接器库并连接到 Azure Databricks 群集。 重启群集以确保加载库。

      接下来,我们为 Scala 和 Python 提供了两个示例。

      Scala

      在这里,我们从数据帧中获取输出为“FOR JSON”的 SQL 查询结果:

      // Connect to Azure SQL /connectors/sql/
      import com.microsoft.azure.sqldb.spark.config.Config
      import com.microsoft.azure.sqldb.spark.connect._
      val configSql = Config(Map(
        "url"          -> "xxxx.database.windows.net",
        "databaseName" -> "xxxx",
        "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
        "user"         -> "xxxx", 
        "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
      // Create DataFrame from Azure SQL query
      val orders = sqlContext.read.sqlDB(configSql)
      display(orders)
      

      接下来,我们将连接到 Azure Cosmos DB 数据库和集合:

      // Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
      import org.joda.time._
      import org.joda.time.format._
      import com.microsoft.azure.cosmosdb.spark.schema._
      import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
      import com.microsoft.azure.cosmosdb.spark.config.Config
      import org.apache.spark.sql.functions._
      import org.joda.time._
      import org.joda.time.format._
      import com.microsoft.azure.cosmosdb.spark.schema._
      import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
      import com.microsoft.azure.cosmosdb.spark.config.Config
      import org.apache.spark.sql.functions._
      val configMap = Map(
        "Endpoint" -> "https://xxxx.documents.azure.com:443/",
        // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
        "Masterkey" -> "xxxx",
        "Database" -> "StoreDatabase",
        "Collection" -> "Orders")
      val configAzure Cosmos DB= Config(configMap)
      

      最后,我们定义架构并使用 from_json 应用数据帧,然后将数据帧保存到 Cosmos DB 集合。

      // Convert DataFrame to proper nested schema
      import org.apache.spark.sql.types._
      val orderDetailsSchema = ArrayType(StructType(Array(
          StructField("OrderDetailId",IntegerType,true),
          StructField("ProductId",IntegerType,true),
          StructField("UnitPrice",DoubleType,true),
          StructField("Quantity",IntegerType,true)
      val ordersWithSchema = orders.select(
        col("OrderId").cast("string").as("id"),
        col("OrderDate").cast("string"),
        col("FirstName").cast("string"),
        col("LastName").cast("string"),
        col("Address").cast("string"),
        col("City").cast("string"),
        col("State").cast("string"),
        col("PostalCode").cast("string"),
        col("Country").cast("string"),
        col("Phone").cast("string"),
        col("Total").cast("double"),
        from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
      display(ordersWithSchema)
      // Save nested data to Azure Cosmos DB
      CosmosDBSpark.save(ordersWithSchema, configCosmos)
      

      Python

      作为替代方法,如果源数据库不支持 FOR JSON 或类似操作,则你可能需要在 Spark 中执行 JSON 转换。 或者,可以对大型数据集使用并行操作。 这里提供了一个 PySpark 示例。 首先配置第一个单元中的源数据库和目标数据库连接:

      import uuid
      import pyspark.sql.functions as F
      from pyspark.sql.functions import col
      from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
      #JDBC connect details for SQL Server database
      jdbcHostname = "jdbcHostname"
      jdbcDatabase = "OrdersDB"
      jdbcUsername = "jdbcUsername"
      jdbcPassword = "jdbcPassword"
      jdbcPort = "1433"
      connectionProperties = {
        "user" : jdbcUsername,
        "password" : jdbcPassword,
        "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
      #Connect details for Target Azure Cosmos DB for NoSQL account
      writeConfig = {
          "Endpoint": "Endpoint",
          "Masterkey": "Masterkey",
          "Database": "OrdersDB",
          "Collection": "Orders",
          "Upsert": "true"
      

      然后,我们将查询源数据库(在本案例中为 SQL Server)中的订单及订单明细记录,并将结果放入 Spark 数据帧。 我们还将创建一个列表,其中包含所有订单 ID 和并行操作的线程池:

      import json
      import ast
      import pyspark.sql.functions as F
      import uuid
      import numpy as np
      import pandas as pd
      from functools import reduce
      from pyspark.sql import SQLContext
      from pyspark.sql.types import *
      from pyspark.sql import *
      from pyspark.sql.functions import exp
      from pyspark.sql.functions import col
      from pyspark.sql.functions import lit
      from pyspark.sql.functions import array
      from pyspark.sql.types import *
      from multiprocessing.pool import ThreadPool
      #get all orders
      orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
      #get all order details
      orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
      #get all OrderId values to pass to map function 
      orderids = orders.select('OrderId').collect()
      #create thread pool big enough to process merge of details to orders in parallel
      pool = ThreadPool(10)
      

      然后,创建一个函数用于将 Orders 写入目标 API for NoSQL 集合。 此函数筛选给定订单 ID 的所有订单详细信息,将其转换为 JSON 数组,并将该数组插入 JSON 文档。 然后,该 JSON 文档将按该顺序写入目标 API for NoSQL 容器:

      def writeOrder(orderid):
        #filter the order on current value passed from map function
        order = orders.filter(orders['OrderId'] == orderid[0])
        #set id to be a uuid
        order = order.withColumn("id", lit(str(uuid.uuid1())))
        #add details field to order dataframe
        order = order.withColumn("details", lit(''))
        #filter order details dataframe to get details we want to merge into the order document
        orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
        #convert dataframe to pandas
        orderpandas = order.toPandas()
        #convert the order dataframe to json and remove enclosing brackets
        orderjson = orderpandas.to_json(orient='records', force_ascii=False)
        orderjson = orderjson[1:-1] 
        #convert orderjson to a dictionaory so we can set the details element with order details later
        orderjsondata = json.loads(orderjson)
        #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
        if (orderdetailsgroup.count() !=0):
          #convert orderdetailsgroup to pandas dataframe to work better with json
          orderdetailsgroup = orderdetailsgroup.toPandas()
          #convert orderdetailsgroup to json string
          jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
          #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
          jsonstring = json.loads(jsonstring)
          #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
          orderjsondata['details'] = jsonstring
        #convert dictionary to json
        orderjsondata = json.dumps(orderjsondata)
        #read the json into spark dataframe
        df = spark.read.json(sc.parallelize([orderjsondata]))
        #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
        #https://learn.microsoft.com/azure/cosmos-db/spark-connector
        df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
      

      最后,我们将在线程池上使用映射函数调用要并行执行的 Python writeOrder 函数,并传入先前创建的订单 ID 列表:

      #map order details to orders in parallel using the above function
      pool.map(writeOrder, orderids)
      

      在这两种方法中,最终都将在 Azure Cosmos DB 集合中的每个 Order 文档中获得正确保存的嵌入 OrderDetails:

    5. 了解 Azure Cosmos DB 中的数据建模
    6. 了解如何在 Azure Cosmos DB 中为数据建模和分区
  •