可以使用
MERGE
SQL 操作将源表、视图或 DataFrame 中的数据更新插入目标 Delta 表。 Delta Lake 支持
MERGE
中的插入、更新和删除,并支持超出 SQL 标准的扩展语法以辅助高级用例。
假设你有一个名为
people10mupdates
的源表或
/tmp/delta/people-10m-updates
处的源路径,其中包含名为
people10m
的目标表或
/tmp/delta/people-10m
处目标路径的新数据。 其中一些新记录可能已经存在于目标数据中。 要合并新数据,需要更新该人的
id
已经存在的行,并在不存在匹配的
id
的地方插入新行。 可以运行以下查询:
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
firstName,
middleName,
lastName,
gender,
birthDate,
salary
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
.whenMatchedUpdate(set =
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
.whenNotMatchedInsert(values =
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
.whenNotMatched
.insertExpr(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
.execute()
请参阅 Delta Lake API 文档,了解 Scala 和 Python 语法详细信息。 有关 SQL 语法详细信息,请参阅 MERGE INTO
使用合并修改所有不匹配的行
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 子句对目标表中的、在源表中没有相应记录的记录执行 UPDATE 或 DELETE 操作。 Databricks 建议添加可选条件子句,以避免完全重写目标表。
以下代码示例显示了将此子句用于删除操作、使用源表的内容覆盖目标表,以及删除目标表中不匹配的记录的基本语法。 有关源更新和删除操作有时限的表的更具可缩放性模式,请参阅将 Delta 表与源逐步同步。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
以下示例将条件添加到 WHEN NOT MATCHED BY SOURCE 子句并指定要在不匹配的目标行中更新的值。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
.execute()
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
合并操作语义
下面是 merge 编程操作语义的详细说明。
可以有任意数量的 whenMatched 和 whenNotMatched 子句。
当源行根据匹配条件与目标表行匹配时,将执行 whenMatched 子句。 这些子句具有以下语义。
whenMatched 子句最多可以有 1 个 update 和 1 个 delete 操作。 merge 中的 update 操作只更新匹配目标行的指定列(类似于 update 操作)。 delete 操作删除匹配的行。
每个 whenMatched 子句都可以有一个可选条件。 如果存在此子句条件,则仅当该子句条件成立时,才对任何匹配的源-目标行对执行 update 或 delete 操作。
如果存在多个 whenMatched 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenMatched 子句都必须具有条件。
如果对于匹配合并条件的源行和目标行对,没有任何 whenMatched 条件的计算结果为 true,则目标行保持不变。
若要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 whenMatched(...).updateAll()。 这等效于:
whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。
启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。
当源行根据匹配条件与任何目标行都不匹配时,将执行 whenNotMatched 子句。 这些子句具有以下语义。
whenNotMatched 子句只能具有 insert 操作。 新行是基于指定的列和相应的表达式生成的。 你无需指定目标表中的所有列。 对于未指定的目标列,将插入 NULL。
每个 whenNotMatched 子句都可以有一个可选条件。 如果存在子句条件,则仅当源条件对该行成立时才插入该行。 否则,将忽略源列。
如果存在多个 whenNotMatched 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenNotMatched 子句都必须具有条件。
若要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 whenNotMatched(...).insertAll()。 这等效于:
whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。
启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。
当源行根据匹配条件与任何目标行都不匹配时,将执行 whenNotMatchedBySource 子句。 这些子句具有以下语义。
whenNotMatchedBySource 子句可以指定 delete 和 update 操作。
每个 whenNotMatchedBySource 子句都可以有一个可选条件。 如果存在子句条件,则仅当目标条件对该行成立时才修改该行。 否则,目标行保持不变。
如果存在多个 whenNotMatchedBySource 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenNotMatchedBySource 子句都必须具有条件。
根据定义,whenNotMatchedBySource 子句没有可从中拉取列值的源行,因此无法引用源列。 对于要修改的每一列,可以指定文本或对目标列执行操作,例如 SET target.deleted_count = target.deleted_count + 1。
如果源数据集的多行匹配,并且合并尝试更新目标 Delta 表的相同行,则 merge 操作可能会失败。 根据合并的 SQL 语义,这种更新操作模棱两可,因为尚不清楚应使用哪个源行来更新匹配的目标行。 你可以预处理源表来消除出现多个匹配项的可能性。
仅当视图已定义为 CREATE VIEW viewName AS SELECT * FROM deltaTable 时,才能对 SQL VIEW 应用 SQL MERGE 操作。
写入 Delta 表时进行重复数据删除
一个常见的 ETL 用例是通过将日志附加到表中来将其收集到 Delta 表中。 但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。 通过 merge,你可以避免插入重复记录。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
包含新日志的数据集需要在其内部进行重复数据删除。 根据合并的 SQL 语义,该数据集会将新数据与表中的现有数据进行匹配并删除重复数据,但如果新数据集中存在重复数据,则将插入。 因此,在合并到表之前,请对新数据进行重复数据删除。
如果你知道只在几天之内有重复记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
这种方法比使用前面的命令更有效,因为它仅在日志的最后 7 天而不是整个表中查找重复项。 此外,你还可以将此 insert-only merge 与结构化流式处理一起使用,以执行日志的连续重复数据删除。
在流式处理查询中,可以使用 foreachBatch 中的 merge 操作将具有重复数据删除功能的所有流数据连续写入 Delta 表。 请参阅以下流式处理查询示例,了解有关 foreachBatch 的详细信息。
在另一个流式处理查询中,你可以从此 Delta 表中连续读取重复数据删除的数据。 这是可能的,因为 insert-only merge 仅将新数据附加到 Delta 表。
使用 Delta Lake 的缓慢变化数据 (SCD) 和变更数据捕获 (CDC)
Delta Live Tables 原生支持跟踪和应用 SCD 类型 1 和类型 2。 将 APPLY CHANGES INTO 与 Delta Live Tables 配合使用可确保在处理 CDC 源时正确处理无序记录。 请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获。
将 Delta 表与源逐步同步
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 创建任意条件以自动删除和替换表的一部分。 如果所用源表的记录可能会在初始数据输入后的几天内更改或删除,但最终会稳定到最终状态,则这会特别有用。
以下查询显示使用此模式从源中选择 5 天的记录,更新目标中的匹配记录,将新记录从源插入目标,并删除目标中过去 5 天的所有不匹配记录。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
通过在源表和目标表上提供同一布尔筛选器,可以将更改从源表动态传播到目标表,包括删除。
尽管可以在不使用任何条件子句的情况下使用此模式,但这将会完全重写目标表,从而可能导致较高的开销。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:https://aka.ms/ContentUserFeedback。
提交和查看相关反馈