Offset commit failed with a retriable exception. You should retry committing the latest consumed off

最新推荐文章于 2023-03-29 09:39:17 发布
最新推荐文章于 2023-03-29 09:39:17 发布 14758

1.业务背景:

kafka 使用的事2.11_0.10.0.1

在做及时通讯IM的时候,客户端A发完消息后会,使用kafka 通知下游服务(kafka 消费者consumer)进行给B端用户发送消息。

2.现象:

1.在某一时间

在某一时间是kafka consumer poll处理时间9.3 s ,poll这里逻辑用了线程池,只有最后提交offests的时候会有个synchronized,但是一般都是0.1ms的。

public void execute() {
        try {
            while (true) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(200);
                StopWatch stopWatch = PerfUtlils.getWatcher();
                PerfCount.countMetric(MicrometerStat.STAT_KAFKA_CONSUMER, "kafka.consumer.poll.record.count");
                //1.先找出吧不同的topic 分区
                for (final TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
                    //2.对于同一个分区的不同record用独立线程处理
                    partitionRecords.forEach(record -> {
                        ConsumerWorker<T> worker = new ConsumerWorker<>(record, handler, offsets, partition);
                        worker.setFutureTask(executors.submit(worker));
                        log.info("consumer worker consume  1 message");
                //3.提交offsets
                commitAsyncOffsets();
                PerfCount.durationTimeMetric(MicrometerStat.STAT_KAFKA_CONSUMER, "kafka.consumer.poll.process", stopWatch);
        } catch (WakeupException e) {
            log.info("kafka poll get WakeupException");
        } finally {
            commitSyncOffsets();
            consumer.close();

2.这时候业务error 日志是

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
 

3.kafka broker 日志:

3.原因:

博主设置的消费者 session.timeout.ms=8000 8s 

其中session.timeout.ms 的含义有两个(kafka 0.10.1.0之前):

  1. 是consumer group 检测组内成员发送奔溃的时间,回个某个group 成员突然崩溃(比如kill -9 或者宕机),group coordinator 有可能需要session.timeout.ms 时间感应到。
  2. 是consumer 消费处理逻辑的最大时间,倘或consumer 两次poll的时间间隔超过该参数所设的阈值,那么group coordinator 入会认为此consumer跟不上组内其他成员的消费进度并将其”踢出”组中,会进行rebalane    

上面文字来自《Apache kafka 实战》

很明显博主的情况属于第二种,这个被”踢出”组中会无法提交位移,之就会造成这些消息会被其他程序消费,但此时刚好这个consumer会再次提交这个offests所有会出现上面的bug。

4.解决办法:

 1.将session.timeout.ms 设置大些。

Offset commit failed with a retriable exception. You should retry committing the latest consumed off 消费时offset被重置导致重复消费1.业务背景:kafka 使用的事2.11_0.10.0.1在做及时通讯IM的时候,客户端A发完消息后会,使用kafka 通知下游服务(kafka 消费者consumer)进行给B端用户发送消息。2.现象:1.在某一时间在某一时间是kafka consumer poll处理时间9.3 s ,poll这里逻辑用了线程池,只有最后提交of... commit failed with a retriable exception. You should retry committing offsets. [2020-09-05 15:05:12] [WARN] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:626] Auto-commit of offsets {com.dbapp.topic.rawevent-2=OffsetAndMetadata{of. 在实际应用的过程中,发现偶尔会有消费数据报:org.apache.kafka.clients.consumer.CommitFailedException异常 原因分析: 提示:这里填写问题的分析: 解决方案: 提示:这里填写该问题的具体解决方案: 例如:新建一个 Message 对象,并将读取到的数据存入 Message,然后 mHandler.obtainMessage(READ_
在进行消费者正常消费过程中以及Rebalance操作之前,都会提交一次offset记录Consumer当前的消费位置。提交offset的功能是由ConsumerCoordinator完成的。 在SubscriptionState中使用TopicPartitionState记录每个TopicPartition的消费状况,TopicPartitionState.position字段则记录了消费者下次要...
1、在Vue项目中引用公共方法 作为一个新人小白,在使用vue的过程中,难免会遇到很多的问题,比如某个方法在很多组件中都能用的上,如果在每个组件上都去引用一次的话,会比较麻烦,增加代码量。怎么做比较好呢,话不多说直接看代码把 首先 要在main.js中引入公共js。然后,将方法赋在Vue的原型链上。 像图中这样。 然后在需要的组件上去引入这个方法 mouted (){ //调用方法 this.common.login(); /**然后公共方法里写一段简单的代码*/ export default{ login:function(){ console.log('这是一段代码')