起因,在使用mirror-maker从生产集群topic往测试集群copy数据时,报错如下:
[2018-10-23 10:21:47,821] FATAL [mirrormaker-thread-2] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for UMETRIP_ACTIVITY_COMPLETE_TOPIC-22: 30048 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
//.......
结合测试集群在kafkaManager上的broker列表显示hostname而非ip,怀疑mirrorMaker连接测试集群时DNS解析hostname失败(预计可通过mirror-maker输出debug日志验证)。
对比了一下生产和测试两套集群在zookeeper上的注册信息:
生产broker:
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.237.64.61:9092"],"jmx_port":9999,"host":"10.237.64.61","timestamp":"1537161676764","port":9092,"version":4}
cZxid = 0x40000172f
ctime = Mon Sep 17 13:21:17 CST 2018
mZxid = 0x40000172f
mtime = Mon Sep 17 13:21:17 CST 2018
pZxid = 0x40000172f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2009fcbb3900007
dataLength = 196
numChildren = 0
测试broker:
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://j1215353563-10:9092"],"jmx_port":9095,"host":"j1215353563-10","timestamp":"1539938435069","port":9092,"version":4}
cZxid = 0x1001b04e3
ctime = Fri Oct 19 16:40:20 CST 2018
mZxid = 0x1001b04e3
mtime = Fri Oct 19 16:40:20 CST 2018
pZxid = 0x1001b04e3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x300386fd8634952
dataLength = 200
numChildren = 0
从zk里的信息看,测试集群broker把自己的hostname注册到了/brokers/ids/x节点下。看到网上有人建议将 hostname ip 的映射放进 /etc/hosts 下,试了一下并不起作用。另外,生产集群机器/etc/hosts下并没有特殊添加hostname,所以怀疑新搭建的测试集群配置出了问题。
把关注点放回到server.properties文件,在里面找到了有关advertised.listeners属性的配置说明,写的很清楚:advertised.listeners需要配置,如果不配置会使用listeners属性,如果listeners也不配置,通过默认的方式获取:java.net.InetAddress.getCanonicalHostName(),该方法预计会返回hostname。
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
对比看了下两套环境的配置,生产broker有配 listeners,测试把这个配置漏掉了。
listeners=PLAINTEXT://10.237.64.61:9092
测试机器添加配置后,问题解决。
起因,在使用mirror-maker从生产集群topic往测试集群copy数据时,报错如下:[2018-10-23 10:21:47,821] FATAL [mirrormaker-thread-2] Mirror maker thread failure due to (kafka.tools.MirrorMaker$MirrorMakerThread)java.util.concur...
[lcc@lcc ~/soft/
kafka
/
kafka
_2.11-1.1.0_author_scram]$ bin/
zookeeper
-shell.sh lcc:2181
Connecting to lcc:2181
Welcome to
ZooKeeper
!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[cluster,
broker
s,
zookeeper
kafka
-
broker
-发现
从发现代理。
通常,您通过为集群中的一部分
Kafka
代理提供连接字符串来初始化
Kafka
Producer对象; 例如
kafka
1:2222,
kafka
2:2222 。 从这里开始,主题、分区和副本信息用于引导与集群的连接。
如果您的应用程序已经连接到
Zookeeper
,那么此代码会自动为您发现
Kafka
代理的子集。
String
zookeeper
Host = "
zookeeper
1 " ;
String
zookeeper
Port = " 4444 " ;
Kafka
Broker
Discoverer discoverer = new
Kafka
Broker
Discoverer (
zookeeper
Host,
zookeeper
Port);
discoverer . getConnectionString();
discover
前段时间,遇到一个
kafka
集群部署在k8s中,而
kafka
客户端在另一个k8s集群的pod的容器中,
kafka
集群中配置的advertised_listeners便是
hostname
。然而当时我只知道
kafka
集群的ip地址,当在客户端直接使用ip操作
broker
的时候便报错无法解析一个
hostname
。
当时不知道什么原因,只听同事说需要在
kafka
客户端对应的yaml文件中添加HostAliases字段然后重新部署
kafka
客户端所在的pod。 然后便可以访问了。
不明所以然的我,查了
broker
.id 默认值:无
每一个
broker
都有一个唯一的id,这是一个非负整数,这个id就是
broker
的"名字",这样就允许
broker
迁移到别的
机器
而不会影响消费者。你可以选择任意一个数字,只要它是唯一的。
log.dirs 默认值:/tmp/
kafka
-logs
一个用逗号分隔的目录列表,可以有多个,用来为
Kafka
存储数据。每当需要为一个新的partition分配...
kafka
,分布式集群架构下,高性能的流式事件数据(主要是消息)集成、发布(生产)和订阅(分发、消费)组件(中间件)。
kafka
依赖zooeeper(数据后端),这里有Windows下安装配置启动
zookeeper
的 文章(1):
Windows 10环境
zookeeper
单机伪集群部署和配置_Zhang Phil-CSDN博客Windows 10环境
zookeeper
单机伪集群部署和配置1,首先到
zookeeper
项目主页地址下载项目包,https://archive.apache.org/dist
同事在他们的服务器上读取
kafka
的信息,使用ip的情况下,端口是通的情况下始终读不到消息。使用zk的ip能list出topic。想到有可能是读取
broker
的时候读到的主机名了。配置/etc/hosts后,能顺利读到消息。
回去
kafka
对应的zk中查看 /
kafka
/
broker
/ids/xxx值,存的果然是主机名。在CDH集群中配置每个
broker
的
kafka
.properties高级值:...
有幸使用
Kafka
Mirror Maker来同步两个地方的数据,以下的是个人笔记记录(未完)。
开启JMX
使用JMX监控
Kafka
的敏感数据需要开启JMX端口,可以通过设置环境变量来实现。我自己的做法是在
kafka
-server-start.sh脚本文件中添加如下配置
if [ "x$
KAFKA
_HEAP_OPTS" = "x" ]; then
export
KAFKA
_HEA...
1.1 简介
对于
kafka
数据的迁移使用
kafka
内置的MirrorMaker工具。
MirrorMaker是
Kafka
附带的一个用于在
Kafka
集群之间制作镜像数据的工具。该工具从源集群中消费并生产到目标群集。这种镜像的常见用例是在另一个数据中心提供副本。
Kafka
的镜像功能可以维护现有
Kafka
集群的副本。
下图显示了如何使用MirrorMaker工具将源
Kafka
集群镜像到目标
Kafka
集群。该工具使用
Kafka
消费者来