所谓滞后程度,就是指消费者当前落后于生产者的程度。
Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。
你在实际业务场景中必须时刻关注消费者的消费进度。
消费进度监控3 种方法。
-
使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
-
使用 Kafka Java Consumer API 编程。
-
使用 Kafka 自带的 JMX 监控指标。
Kafka 自带命令
Kafka 自带的命令行工具
bin/kafka-consumer-groups.sh
kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。
示例: Kafka 集群的连接信息,即 localhost:9092。消费者组名:testgroup
它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前
最新生产的消息的位移值
(即 LOG-END-OFFSET 列值)、该消费者组
当前最新消费消息的位移值
(即 CURRENT-OFFSET 值)、
LAG 值
(前两者的差值)、
消费者实例 ID
、
消费者连接 Broker 的主机名
以及
消费者的 CLIENT-ID 信息
。
Kafka Java Consumer API [ Kafka 2.0.0 ]
代码示例:
第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;
第 2 处则是获取订阅分区的最新消息位移;
第3 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
Kafka JMX 监控指标
Kafka 默认提供的 JMX 监控指标来监控消费者的 Lag 值
Kafka 消费者提供了一个名为
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”
的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:
records-lag-max
和
records-lead-min
,
它们分别表示此消费者在测试窗口时间内曾经达到的
最大的 Lag 值
和
最小的 Lead 值
。
Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值
。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。
监控到 Lag 越来越大,消费者程序变得越来越慢了,至少是追不上生产者程序了.
Lead 越来越小,甚至是快接近于 0 了,消费者端要丢消息了
Kafka 的消息是有留存时间设置的,
默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据
。倘若你的消费者程序足够慢,
慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形
。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。
Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
所谓滞后程度,就是指消费者当前落后于生产者的程度。Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。...
Con
sum
er
OffsetCheck
er
在0.8.2.2版本如下
kafka
_2.10-0.8.2.2-sources.jar!/
kafka
/tools/Con
sum
er
OffsetCheck
er
.scala
object Con
sum
er
OffsetCheck
er
extends Logging {
手把手视频详细讲解项目开发全过程,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。
细致简介了消息队列在大数据的应用场景、
Kafka
集群搭建、
Kafka
操作,基准测试、架构、编程、结合
Kafka
Eagle简介原理等
1,知识体系完备,从小白到大神各阶段读者均能学有所获。
2,生动形象,化繁为简,讲解通俗易懂。
3,结合工作实践及分析应用,培养解决实际问题的能力。
4,企业级方案设计,完全匹配工作场景。
1、对大数据感兴趣的在校生及应届毕业生。
2、对目前职业有进一步提升要求,希望从事大数据行业高薪工作的在职人员。
3、对大数据行业感兴趣的相关人员。
第一章 简介
1.1 消息队列简介
1.2
Kafka
简介
1.3
Kafka
的优势
1.4 哪些公司在使用
Kafka
1.5
Kafka
生态圈介绍
1.6
Kafka
版本
第二章 环境搭建
2.1 搭建
Kafka
集群
2.2 目录结构分析
2.3
Kafka
一键启动/关闭脚本
第三章 基础操作
3.1 创建topic
3.2 生产消息到
Kafka
3.3 从
Kafka
消费
消息
3.4 使用
Kafka
Tools操作
Kafka
第四章
Kafka
基准测试
第五章 Java编程操作
Kafka
5.1 同步生产消息到
Kafka
中
5.2 从
Kafka
的topic中
消费
消息
5.3 异步使用带有回调函数方法生产消息
第六章 架构
6.1
Kafka
重要概念
6.2
消费者
组
第七章.
Kafka
生产者幂等性与事务 38
7.1 幂等性
第八章 分区和副本机制
8.1 生产者分区写入策略
8.2
消费者
组Rebalance机制
8.3
消费者
分区分配策略
8.4 副本机制
第九章 高级(High Level)API与低级(Low Level)API
9.1 高级API
9.2 低级API
9.3 手动
消费
分区数据
第十章
监控
工具
Kafka
-eagle介绍
10.1
Kafka
-Eagle简介
10.2 安装
Kafka
-Eagle
10.3
Kafka
度量指标
第十一章
Kafka
原理
11.1 分区的lead
er
与follow
er
11.2
Kafka
生产、
消费
数据工作流程
11.3
Kafka
的数据存储形式
11.4 消息不丢失机制
11.5 数据积压
第十二章
Kafka
中数据清理(Log Deletion)
12.1 日志删除
12.2 日志压缩(Log Compaction)
深入浅出理解
kafka
原理系列之:
kafka
延迟检测
kafka
_con
sum
er
group_
lag
_
sum
kafka
的jmx指标
kafka
_con
sum
er
group_
lag
_
sum
kafka
_con
sum
er
group_
lag
_
sum
:查看
kafka
的topic
消费
积压的值
kafka
_con
sum
er
group_
lag
_
sum
{con
sum
er
group=“optics-stg”,topic=“debezium-stg-optics”} 0
默认值500,
消费者
消费
kafka
topic产生
消费
event streaming
central n
er
vous system 中枢神经系统
event streaming is the practice of capturing data in real time from event sources like
databases,sensors,mobile devices,cloud s
er
vices, and software applications in the form of streaming of events;
storing t
var pinf = plus.push.getClientInfo();
var cid = pinf.clientid;//客户端标识
console.log('cid=='+cid)
plus.push.addEventListen
er
("receive", function(msg) {
if ( ms
一直以来都想写一点关于
kafka
con
sum
er
的东西,特别是关于新版con
sum
er
的中文资料很少。最近
Kafka
社区邮件组已经在讨论是否应该正式使用新版本con
sum
er
替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本con
sum
er
的些许设计理念,希望能把con
sum
er
这点事说清楚,从而对广大使用者有所帮助。
在开始之前,我想花一点时间先来明确一些概念和术语,这...
一:各节点的作用
Brok
er
在
kafka
里面就是一个数据节点
Topic是一个主题,类似于一张表或ES中的index;在
kafka
里面没有上面数据类型的说法(来的时候就是一条一条文本数据,发出去的时候就一条一条的发出去)
Partition分区,与Topic的关系是:Topic里面有多个Partitiom;Patition又分为lead
er
partition和follow
er
partition。
一般
监控
kafka
消费
情况我们可以使用现成的工具来查看,但如果发生大量延迟不能及时知道。所以问题就来了,怎么用java api 进行
kafka
的
监控
呢?
用过
kafka
都该知道 延迟量
lag
= logSize(topic记录量) - offset(
消费
组
消费
进度
)
所以我们获取到logSize / offset 就可以了。
鉴于这部分信息网上资料非常少,特地将代码抛出来。
我使用的...
<dependency>
<groupId>org.springframework.
kafka
</groupId>
<artifactId>spring-
kafka
</artifactId>
<v
er
sion>2.5.4.RELEASE</v
er
sion>
</dependency>
2. 配置
Kafka
消费者
在application.prop
er
ties文件中添加以下配置:
spring.
kafka
.con
sum
er
.bootstrap-s
er
v
er
s=localhost:9092
spring.
kafka
.con
sum
er
.group-id=my-group
spring.
kafka
.con
sum
er
.auto-offset-reset=earliest
3. 创建
Kafka
消费者
创建一个
Kafka
消费者
类,使用@
Kafka
Listen
er
注解指定要监听的主题和方法:
@Component
public class My
Kafka
Con
sum
er
{
@
Kafka
Listen
er
(topics = "my-topic", groupId = "my-group")
public void con
sum
e(String message) {
System.out.println("Received message: " + message);
4. 运行应用程序
启动应用程序并发送消息到“my-topic”主题,您应该能够在控制台上看到
消费者
接收到的消息。
以上就是整合
Kafka
消费者
的步骤,希望对您有所帮助。