set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
为表启用更改数据馈送选项后,无法再使用 Databricks Runtime 8.1 或更低版本写入该表。 始终可以读取该表。
仅记录启用更改数据馈送后所做的更改;不会捕获之前对表所做的更改。
更改数据存储
Azure Databricks 将 UPDATE、DELETE 和 MERGE 操作的更改数据记录在表目录下的 _change_data 文件夹中。 某些操作(如仅插入操作和完整分区删除)不会在 _change_data 目录中生成数据,因为 Azure Databricks 可以直接从事务日志高效计算更改数据馈送。
_change_data 文件夹中的文件遵循表的保留策略。 因此,如果运行 VACUUM 命令,也会删除更改数据馈送数据。
在批处理查询中读取更改
可在开始和结束时提供版本或时间戳。 开始和结束版本以及时间戳包含在查询中。 若要读取从表的特定开始版本到最新版本的更改,仅指定起始版本或时间戳。
将版本指定为整数,将时间戳指定为字符串,格式为 yyyy-MM-dd[ HH:mm:ss[.SSS]]。
如果提供的版本较低或提供的时间戳早于已记录更改事件的时间戳,那么启用更改数据馈送时,会引发错误,指示未启用更改数据馈送。
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
Scala
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
在流式处理查询中读取更改
Python
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
若要在读取表的同时获取更改数据,请将选项 readChangeFeed 设置为 true。
startingVersion 或 startingTimestamp 是可选项,如果未提供,则流会在流式传输时将表的最新快照返回为 INSERT,并将未来的更改返回为更改数据。
读取更改数据时还支持速率限制(maxFilesPerTrigger、maxBytesPerTrigger)和 excludeRegex 等选项。
对于除起始快照版本之外的版本,速率限制可以是原子性的。 也就是说,整个提交版本将受到速率限制,或者将返回整个提交。
默认情况下,如果用户传入的版本或时间戳超过了表的最后一次提交,则会引发错误 timestampGreaterThanLatestCommit。 在 Databricks Runtime 11.3 LTS 及更高版本中,如果用户将以下配置设置为 true,则更改数据馈送可以处理范围外版本的情况:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
如果提供的起始版本大于表上的最后一次提交,或起始时间戳比表上的最后一次提交更晚,则启用上述配置时,将返回空读取结果。
如果提供的结束版本大于表上的最后一次提交,或者结束时间戳比表上的最后一次提交更早,那么在批量读取模式下启用上述配置时,将返回起始版本和最后一次提交之间的所有更改。
什么是更改数据馈送的架构?
当你读取表的更改数据馈送时,将使用最新表版本的架构。
完全支持大多数架构更改和演变操作。 启用了列映射的表不支持所有用例,它们会表现出不同的行为。 请参阅启用了列映射的表的更改数据馈送限制。
除了 Delta 表架构中的数据列之外,更改数据馈送还包含用于标识更改事件类型的元数据列:
启用了列映射的表的更改数据馈送限制
在 Delta 表上启用列映射后,可以删除或重命名表中的列,而无需重写现有数据的数据文件。 如果启用了列映射,在执行非添加性架构更改(例如重命名或删除列、更改数据类型或更改可为 null 性)后,更改数据馈送存在限制。
你无法使用批处理语义读取发生非添加性架构更改的事务或范围的更改数据馈送。
在 Databricks Runtime 13.0 及更低版本中,启用了列映射且经历过非添加性架构更改的表不支持对更改数据馈送执行流式读取。 请参阅使用列映射和架构更改进行流式处理。
在 Databricks Runtime 12.0 及更低版本中,无法读取启用了列映射且经历过列重命名或删除操作的表的更改数据馈送。
在 Databricks Runtime 12.1 及更高版本中,可以对启用了列映射且经历过非添加性架构更改的表的更改数据馈送执行批量读取。 读取操作不使用表的最新版本的架构,而是使用查询中指定的表的最终版本的架构。 如果指定的版本范围涵盖非添加性架构更改,查询仍会失败。
常见问题解答 (FAQ)
启用更改数据馈送会产生多大的开销?
没有明显的影响。 更改数据记录是在查询执行过程中以内联方式生成的,通常比重写文件的总大小要小得多。
更改记录的保留策略是什么?
更改记录遵循的保留策略与过时的表版本相同,如果更改记录超过了指定的保留期,会通过 VACUUM 将其清理。
新记录何时在更改数据馈送中提供?
更改数据是连同 Delta Lake 事务一起提交的,当新数据出现在表中时,更改数据也会出现在其中。
笔记本示例:使用增量更改数据馈送来传播更改
此笔记本展示如何将对有关疫苗接种绝对数量的银色表所做的更改传播到有关疫苗接种率的金色表。
更改数据馈送笔记本
获取笔记本