Kafka如何做到消息的不丢不重
消息的丢失与重复消费都会影响到系统间数据的一致性,从而造成生产问题。在使用Kafka的过程中如何做到消息的不丢失、不重复呢?也就是做到Exactly once(有且仅有一次)的语义保证。
首先,从上图可以看到,kafka的消息生命周期包括:Producer端消息发送,服务端接收消息备份存储,Consumer端消费消息等过程。要做到消息的不丢、不重需要从这三方面着手。
Producer端
消息发送API的选择
消息发送的两种API | producer.send(record) | producer.send(record, callback) |
---|---|---|
特点 | 发送后不管 | 通过回调函数确认消息发送成功,否则重新发送。 |
消息发送的两种API中,显然选用producer.send(record, callback)是最好的,如果发送后就不管了,可能会出现由于网络抖动因素,导致消息没有发送到Broker端,也可能由于消息不合格等原因,Broker端无法接收,最终造成消息丢失。
关于消息不合格的解释:
以下两个配置项控制了Broker接收消息的阈值,超过设置值Broker不能接收,所以需衡量消息的大小,合理设置这两个配置项。max.message.bytes控制精度更细,可以针对每个Topic设置正常接收消息最大值。
Broker Configs | Valid Values | Default | Meaning |
---|---|---|
message.max.bytes | [0,...] | 1048588 | 控制 Broker 能够接收的最大消息大小,默认值1M。 |
Topic-Level Configs | Valid Values | Default | Meaning |
---|---|---|
max.message.bytes | [0,...] | 1048588 | 决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。 |
相关配置项的设置
Producer Configs | Valid Values | Default | Meaning |
---|---|---|
retries | [0,...,2147483647] | 0 | 设置大于零的值会使客户端重新发送任何因潜在的暂时性错误而失败的请求。 |
解释:
对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。建议将其设置为最大值。
Producer Configs | Valid Values | Default | Meaning |
---|---|---|
acks | [all, -1, 0, 1] | all | 该配置项控制着发送消息的持久性保证。 |
解释:
acks可以设置为0、1、-1(等价于all)
当配置acks=0 :Producer不会等待任何来自服务端的“已提交”回复,消息将直接发送,这种情况无法保证Broker端收到了消息。存在很大的消息丢失风险,比如网络抖动。
当配置acks=1 :leader将写消息到本地LOG,但是不会等待所有Follower的完全确认,就回复Producer“已接收”。因为消息完全备份到Follower,如果Leader所在的机器挂掉了,也会发生数据丢失。
当配置acks=-1: Leader要等待所有的同步副本(In-sync replicas ,ISR)集合全部接收消息后,再返回给Producer 确认通知。
关于同步副本(In-sync replicas ,ISR)集合的解释:
Leader与Followers之间的同步时存在延时的,Leader挂掉后需要从Followers中选出新的Leader,显然延时高的Follower去做Leader是不合理的,Kafka动态的维护着此同步副本(ISR)集合,消息接收延时高的follwer会被踢出该集合,只有该集合的成员才有资格被选举为领导者。此 ISR 集在发生更改时会持久保存到 ZooKeeper。 因此,ISR 中的任何副本都有资格被选为领导者。
显然acks=-1这种情况能够做到最好的消息不丢失。但是如果图2第三种场景中ISR集合所有机器都挂掉了呢?此时就要考虑下面这个配置项--unclean.leader.election.enable。
服务端
相关配置项的设置
Broker Configs | Default | Meaning |
---|---|---|
unclean.leader.election.enable | false | 指示是否启用不在 ISR 集中的副本作为最后的选择作为领导者。 |
解释:
假设那些保存数据比较多的副本都挂了,是否还进行Leader选举?
如果设置成 false,表示不让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。
如果设置成 true,那么 Kafka 允许你从那些备份慢的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全。所以该配置项最好设置为false。
Broker Configs | Valid Values | Default | Meaning |
---|---|---|
min.insync.replicas | [1,...] | 1 | 控制的是消息至少要被写入到多少个副本才算是“已提交”。前提是当生产者将确认设置为“all”(或“-1”)时。 |
解释:
在使用acks设置为all时,所有的同步副本(ISR)集合全部接收消息后,就认为是“已提交”,现在设置了min.insync.replicas,相当于增加了对“已提交”的确认条件,那就是还需保证消息至少要被写入了多少个副本。
典型场景举例:消息副本数为3,min.insync.replicas 设置为 2,acks 设置为 all ,如果2个及以上的副本没有被写入,Producer将出现异常。
min.insync.replicas设置成大于 1 可以提升消息持久性,配合 acks 一起使用可以获得更大的持久性保证,从而更好的做到消息不丢失。
Consumer端
通过前面的工作,已经保证存储到服务端的消息不丢不重了,那在Consumer端如何保证消息不丢不重呢?
首先要明确Consumer自己决定何时、如何消费消息,通过 pull(拉)方式从服务端拉取消息。并且在Consumer端保存消费的具体位置(Consumer offset),即使Consumer宕机,在其恢复上线后,可以根据自己保存的位置重新拉取,从而保证消息不丢失。
由此Consumer端最主要的是要维护好Consumer offset,即消费者消费某Partition的哪个offset。参考图1场景,消费者组中的每个消费者实例去读取同一个Topic的不同Partition的消息,读取消息的位置可能不同,消费者程序应该做好维护。有如下几种方案可选。
位移提交方式 | 同步提交 | 异步提交 | 同步提交+异步提交 | |
---|---|---|---|---|
自动提交 | 方案1 | |||
手动提交 | 方案2 | 方案3 | 方案4 |
与自动提交相关的配置项
Consumer Configs | Default | Meaning |
---|---|---|
enable.auto.commit | true | 自动提交与手动提交控制开关 |
auto.commit.interval.ms | 5000 (5 seconds) | 如果 enable.auto.commit 设置为 true,消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位)。 |
方案1: enable.auto.commit=true 自动提交
poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但可能会出现重复消费。
方案2: enable.auto.commit=false 手动提交,使用同步提交方法commitSync() 去提交Consumer offset
调用commitSync()方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。
缺点: 调用 commitSync() 时,Consumer 程序会处于阻塞状态
方案3: enable.auto.commit=false 手动提交,使用异步提交方法commitAsync() 去提交Consumer offset
调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。
缺点:出现问题时它不会自动重试。异步提交的重试没有意义,提交失败后,自动重试时提交的位移值可能早已经过期。
方案4: enable.auto.commit=false ,同时采用同步提交和异步提交两种方式
该方案既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性。同时做到了消息处理的不丢不重。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 异步提交
} catch(Exception e) {
handle(e); // 处理异常
} finally {