1.1 起因
这段时间在进行数据库的实时接入方面的工作,并且要求在kafka上进行数据的发布和接入,所以在领导的介绍下github查看了部分CDC工具,最终根据需求选择了Debezium。
1.2 Debezium
什么是Debezium?
官网起步教程
Debezium是一个分布式平台,可将您现有的数据库转换为事件流,因此应用程序可以查看并立即响应数据库中的每个行级更改。Debezium建立在
Apache Kafka
之上,并提供
Kafka Connect
兼容连接器,用于监视特定的数据库管理系统。Debezium在Kafka日志中记录数据更改的历史记录,您的应用程序将在此位置使用它们。这使您的应用程序可以轻松,正确,完整地使用所有事件。即使您的应用程序停止(或崩溃),在重新启动时,它也会开始使用中断处的事件,因此不会丢失任何内容。(官网谷歌翻译得到的,放心食用)
1.3 使用场景
目前的数据库间的数据摆渡数据量大,并且传统意义上的定时更新任务时间较长,虽然仍然有一部分需要进行数据导入。但是对于部分增量数据的构建耗时时间极长(在获取增量数据进行排序获取的时候),但那些也是另外需要解决的问题。
同样,CDC(change data capture)适用于关系型数据库的同步问题,可以根据数据库日志的抓取进行数据库insert、update、delete操作,并进行实时的同步更新。
另外,这里不对kafka和kafka connect进行太多介绍。
2. 搭建实践
2.1 版本说明
kafka:kafka_2.12-2.4.0、zookeeper:3.4.5、mongodb:mongodb-linux-x86_64-rhel70-4.2.2
centos7 虚拟机三台、XShell、 debezium-connector-mongodb-1.0.0.Final-plugin.tar
2.2 芒果(mongodb)DB配置
解压到某目录下,进入该目录并创建文件夹data、conf、logs(自己喜欢就好,名字怎么帅怎么来).
在conf目录下创建你的conf文件,命名 xxxx.conf
dbpath=/usr/local/tools/mongodb/mongodb-linux-x86_64-rhel70-4.2.2/data/r1
#数据存放位置
logpath=/usr/local/tools/mongodb/mongodb-linux-x86_64-rhel70-4.2.2/logs/mongo27001.log
#日志存放路径
fork=true #后台运行
replSet=rs77 #副本集名称,必须一致
logappend=true #这个防止每次重启清空日志
port=27001 #mongo端口
bind_ip=0.0.0.0 #0.0.0.0表示接受任何ip请求
auth=true #开启登录认证,请在创建了用户之后再开启
keyFile=/usr/local/tools/mongodb/mongodb-linux-x86_64-rhel70-4.2.2/data/key/keyfile.key
#keyfile也是认证文件,也是要一致,好像里面的大小还不能超过1024还是多少来着,你随便生成一个
keyFile文件生成:
openssl rand -base64 200 > 你的keyFile文件位置
#别忘记给个权限
chmod 600 keyFile
# 要求是要600
目前配置基本完成,我是在一台机上分了三个端口启动了三个mongodb,所以复制3个配置文件,修改下其中的端口号就行。启动mongodb:
./bin/mongod -f conf/mongo27001.conf
./bin/mongod -f conf/mongo27002.conf
./bin/mongod -f conf/mongo27003.conf
进入mongodb
./bin/mongo -host xx.xx.xx.xx -port 27001
# 设置副本集
conf={_id : 'rs77',members : [{_id : 0, host : 'xx.xx.xx.xx:27001'},{_id : 1, host : 'xx.xx.xx.xx:27002'},{_id : 2, host : 'xx.xx.xx.xx:27003'}]}
# 初始化
rs.initiate(conf);
# 查看状态
rs.status()
啊,数据库配置挺烦。接下来问题比较大。
-
前面说debezium是根据日志文件去读取数据库改变的,所以oplog.rs在local库下。
-
local库下不允许创建用户,官方说明的。
-
-
这里注意创建用户的时候不要随便给与高的权限,实际生产环境中
admin
和
local
保存的信息是很重要的。
-
所以曲线救国,在其它数据库下创建用户并授权其对local库的read权限。
-
然后对比自己所要监测的库进行用户的权限授权。
-
数据库和测试表也建好。我这里创建了kafkatest数据库、www和uuu两张表进行测试。
-
搞定重启。
2.3 kafka connect配置
这里zookeeper和kafka的启动不作说明,对于kafka配置debezium来说其配置非常简单。
首先进入kafka的conf文件夹中找到connect-distributed.properties文件。
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=172.168.31.79:9092,172.168.31.77:9092,172.168.31.78:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 这里有这两个值,即对key和value进行格式处理,可选avro,但是要放入avro-jar包
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/tools/kafka/kafka_2.12-2.4.0/plugs
# 这里设置plugin.path即插件包的地址,启动的时候会进行读取改目录下的jar,其实放kakfa的lib的
目录下也行,但是我试过有时候会读取不到,估计有冲突问题。
上面需要注意的还有
config.storage.topic=connect-configs #注意,这应该是一个单个的
partition
,多副本的
topic
。你需要手动创建这个
topic
,以确保是单个
partition
(自动创建的可能会有多个partition)。
status.storage.topic=connect-status #
topic
用于存储状态;这个
topic
可以有多个
partitions
和副本
offset.storage.topic=connect-offsets #
topic
用于存储
offsets
;这个topic应该配置多个
partition
和副本。
这里推荐自己先kafka进行创建。confluent官方给出创建建议:
https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config
# config.storage.topic=connect-configs
bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
*至此,将下载的debezium的mongodb连接器中的jar放入plugin文件夹中。
2.4 提交
在kafka connect中,官方已经给出REST API对connector进行了管理:
启动kafka connect:
./bin/connect-distributed.sh -daemon config/connect-distributed.properties
# 分布式启动,单例模式需要指定下一个代码段的配置信息
此处进行connector提交:
curl -i -X POST "Accept:application/json" -H "Content-Type:application/json" 172.168.31.77:8083/connectors/ -d '{"name": "mongodb-connector", "config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","task.max": "1","mongodb.hosts": "rs77/172.168.31.77:27001, rs77/172.168.31.77:27002, rs77/172.168.31.77:27003", "mongodb.user": "mongo", "mongodb.password": "123", "mongodb.authsource": "kafkatest", "mongodb.name": "rs77","database.history.kafka.bootstrap.servers": "172.168.31.77:9092, 172.168.31.78:9092, 172.168.31.79:9092","snapshot.delay.ms": "3000", "database.whitelist": "kafkatest", "topic" : "kafka-mongo"}}'
上面比较难看出来,下面规范下:
"name": "mongodb-connector", # 名字自己取
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", # connector连接器class名称
"task.max": "1",
"mongodb.hosts": "rs77/172.168.31.77:27001, rs77/172.168.31.77:27002, rs77/172.168.31.77:27003", # 这里注意要副本集+地址端口形式,不然可能无法连接
"mongodb.user": "mongo",
"mongodb.password": "123",
"mongodb.authsource": "kafkatest", # 认证库, 即你shell登录要先use到所在的库才可以进行改库的账户登录
"mongodb.name": "rs77", # 副本集名称
"database.history.kafka.bootstrap.servers": "172.168.31.77:9092, 172.168.31.78:9092, 172.168.31.79:9092", # kafka地址
"snapshot.delay.ms": "3000",
"database.whitelist": "kafkatest" # 监测的库名,可以正则匹配
更多的不一一介绍了,官方需要的配置说的很清楚:
https://debezium.io/documentation/reference/connectors/mongodb.html#example-configuration
最后,提交之后会在shell页面显示:
HTTP/1.1 201 Created
Date: Mon, 20 Jan 2020 03:22:28 GMT
Location: http://172.168.31.77:8083/connectors/mongodb-connector
Content-Type: application/json
Content-Length: 594
Server: Jetty(9.4.20.v20190813)
记得及时查看connect的日志文件(在自己配置的logs文件夹下),查看是否有错误抛出。
2.5 测试
至此,如果你没有设置kafka的topic自动创建关闭的话,你就会看到自动创建了有关你的库下自己填写需要监控的表的topic(这里推荐先自己创建),比如我这里监控了该库下面所有的表,所以自动生成了两个表:
接下来进行副本集PEIMARY端选一个进行测试,
呃,消费端可以消费到数据库的变动,大功告成。
至于消费端的json里面的含义(官方传送门):https://debezium.io/documentation/reference/connectors/mongodb.html#events
3. 总结
- 感谢debezium开发组,有空会去看源码。
- debezium在kafka connect配置端的配置极其简单(jar复制粘贴)。
- 数据库端配置较为繁琐。
- 一定要注意数据库权限问题。
- 另外官网教程很详细但是实际遇到问题大部分是数据库那边的配置问题。
4. 后记
后面会逐渐推出debezium其他数据库的配置(躺坑)过程。并对实际数据接入监控方面出现了的问题进行总结。
本文可能存在不合理的地方,欢迎各位指正。最后一句Google大法好。
目录一、简介1.1 起因1.2 Debezium1.3 使用场景2. 搭建实践2.1 版本说明2.2 芒果(mongodb)DB配置2.3 kafka connect配置2.4 提交2.5 测试3. 总结4. 后记一、简介1.1 起因这段时间在进行数据库的实时接入方面的工作,并且要求在kafka上进行数据的发布和接入,所以在领导的介绍下g...
Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群以查找数据库和集合中的文档变化,并将这些变化记录为 Kafka 主题中的事件。连接器会自动处理分片集群中片的添加或删除、每个副本集成员的变化、每个副本集合中的成员选举,以及等待通信问题解决。
有关与此连接器兼容的 MongoDB 版本的信息,请参阅Debezium 版本概述。
MongoDB 的复制机制提供了冗余和高可用性,是在生产环境中运行 MongoDB 的最佳方式。MongoDB 连接器可以捕获副
1.1 Debezium 介绍
Debezium 是一个分布式的平台 ,注册source connector 用于源集群和kafka进行连接, 捕获原集群数据库中的更改记录,并将更改记录进行解析成相应的格式,以消息的形式保存到kafka中,
也可以注册sink connector 用于kafka 与 目的集群进行连接,消费kafka中的数据记录,并解析成数据库中能插入的sql语句。
下图为Debezium的整个架构,Debezium以插件的形式,部署在 Kafka Connect 上,Kafka Co
Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
Kafaka connect的核心组件:Source:负责将外部数据写入到kafka的topic中。Sink:负责从
mongosync是用于MongoDB复制集之间,复制集到分片集群之间以及分片集群与分片集群之间同步数据的一个工具。
1.实时迁移,尤其是从一个集群迁移到另一个集群,或者master-slave架构迁移到replica sets架构
2.实时同步,比如同步数据到其他集群。
3.其他场景
mongosync特点及功能增强
1.极速(ssd环境最大能达到百万每秒)、易用;
2.支持全量同步,增量同步,支持同步单库、单集合
本人未实时同步成功!
运行环境:centos
方法:/data/mongosync -h server1:30000 --to server2:40000 -d 库名 -c 集合名
更多方法baidu google
本文的上半部分: Debezium connector for MySQL 基本概念
5. 部署
要部署 Debezium MySQL 连接器,您需要安装 Debezium MySQL 连接器插件程序,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。
已安装Apache Zookeeper、Apache Kafka和Kafka Connect。
MySQL 服务器已安装并设置为与 Debezium 连接器一起使用
Debezium connector for MySQL
MySQL 的 binlog 会按照事务提交的顺序记录所有的操作变更。这些变更既包含 表 schema的变更也包含 数据的变更。MySQL 使用binlog来复制和恢复数据。
Debezium MySQL 连接器读取 binlog,为行级INSERT,UPDATE和DELETE操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。
由于 MySQL 通常设置为在指定时间段后清除 binlog,因此 MySQ
https://debezium.io/
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.0.CR1/debezium-connector-mysql-1.1.0.CR1-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium...
最近在研究debezium相关功能,因为官网上推荐使用kafka的方式,但是kafka比较繁琐,使用起来配置比较麻烦,所以使用嵌入式方式摆脱kafka的束缚,以下是嵌入式相关配置:
* Debezium 配置.
* MongoDB
* @return configuration
@Bean
io.debezium.config.Configuration debeziumConfig() {
ret...
可以使用Flask-APScheduler在MongoDB中实现定时任务,只需要在Flask应用中定义一个定时任务,并将其配置为在MongoDB中运行。具体代码如下:from flask_apscheduler import APSchedulerscheduler = APScheduler()# Configure the scheduler to use MongoDB as its job store
scheduler.add_jobstore('mongodb', host='localhost',
database='your_database_name')@scheduler.task('interval', id='do_job_1', seconds=30)
def job_1():
print("Job 1 executed")@scheduler.task('cron', id='do_job_2', day_of_week='mon-sun', hour='12', minute='30')
def job_2():
print("Job 2 executed")# Start the scheduler
scheduler.start()