一、偏移量提交

消费者提交偏移量的主要是消费者往一个名为 _consumer_offset 的特殊主题发送消息,消息中包含每个分区的偏移量。

如果消费者一直运行,偏移量的提交并不会产生任何影响。但是如果有消费者发生崩溃,或者有新的消费者加入消费者群组的时候,会触发 Kafka 的 再均衡 。这使得 Kafka 完成再均衡之后,每个消费者可能被会分到新分区中。为了能够继续之前的工作,消费者就需要读取每一个分区的最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

但是这样可能会出现如下的问题。

1.1 提交偏移量小于客户端处理的偏移量

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

因此,如果处理偏移量,会对客户端处理数据产生影响。KafkaConsumer API 提供了很多种方式来提交偏移量。

二、自动提交

自动提交是 Kafka 处理偏移量最简单的方式。

当 enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。这是因为提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

但是使用这种方式,容易出现 提交的偏移量小于客户端处理的最后一个消息的偏移量 这种情况的问题。假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到5s的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。

虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

三、手动提交

大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。

这是我们需要把把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。

3.1 同步提交

使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

代码示例如下:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("topic = %s, partition = %s, offset =
          %d, customer = %s, country = %s\n",
             record.topic(), record.partition(),
                  record.offset(), record.key(), record.value()); 
        try {
          consumer.commitSync(); 
        } catch (CommitFailedException e) {
            log.error("commit failed", e) 

commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

同时在这个程序中,只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里。

3.2 异步提交

同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。

这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker 的响应。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    consumer.commitAsync(); 

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。

它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

commitAsync() 也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标。如果要用它来进行重试,则一定要注意提交的顺序。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition,
        OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);

3.3 同步和异步混合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。

但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此在这种情况下,我们应该考虑使用混合提交的方法:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("topic = %s, partition = %s, offset = %d,
            customer = %s, country = %s\n",
            record.topic(), record.partition(),
            record.offset(), record.key(), record.value());
        consumer.commitAsync(); 
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(); 
    } finally {
        consumer.close();
  1. 在程序正常运行过程中,我们使用 commitAsync 方法来进行提交,这样的运行速度更快,而且就算当前提交失败,下次提交成功也可以。
  2. 如果直接关闭消费者,就没有所谓的“下一次提交”了,因为不会再调用poll()方法。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。

3.4 提交特定的偏移量

如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

这时候需要使用一下的两个方法:

* Commit the specified offsets for the specified list of topics and partitions. @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) * Commit the specified offsets for the specified list of topics and partitions to Kafka. @Override public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。

假设处理了半个批次的消息,最后一个来自主题“customers”分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

代码如下:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>(); 
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("topic = %s, partition = %s, offset = %d,
        customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value()); 
        currentOffsets.put(new TopicPartition(record.topic(),
        record.partition()), new
        OffsetAndMetadata(record.offset()+1, "no metadata")); 
        if (count % 1000 == 0) 
            consumer.commitAsync(currentOffsets,null); 
        count++;

这里调用的是 commitAsync(),不过调用commitSync()也是完全可以的。在提交特定偏移量时,仍然要处理可能发生的错误。

四、监听再均衡

如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。

  1. public void onPartitionsRevoked(Collection partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  2. public void onPartitionsAssigned(Collection partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。

下面的例子将演示如何在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
  new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener { 
    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) { 
    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 
try {
    consumer.subscribe(topics, new HandleRebalance()); 
    while (true) {
        ConsumerRecords<String, String> records =
          consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.println("topic = %s, partition = %s, offset = %d,
             customer = %s, country = %s\n",
             record.topic(), record.partition(), record.offset(),
             record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(),
             record.partition()), new
             OffsetAndMetadata(record.offset()+1, "no metadata"));
        consumer.commitAsync(currentOffsets, null);
} catch (WakeupException e) {
    // 忽略异常,正在关闭消费者
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");

如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。

Kafka - 偏移量提交一、偏移量提交消费者提交偏移量的主要是消费者往一个名为_consumer_offset的特殊主题发送消息,消息中包含每个分区的偏移量。如果消费者一直运行,偏移量的提交并不会产生任何影响。但是如果有消费者发生崩溃,或者有新的消费者加入消费者群组的时候,会触发 Kafka 的再均衡。这使得 Kafka 完成再均衡之后,每个消费者可能被会分到新分区中。为了能...
kafka-hadoop-loader 这个hadoop加载器为每个主题代理分区创建拆分,这在kafka steram和mapper任务之间创建了理想的并行度。 此外,它不使用高级使用者,而是直接与zookeeper通信以管理消耗的偏移量,消耗的偏移量在每个地图任务结束时提交,也就是说,当输出文件已从hdfs_temp移至其最终目的地时。 实际使用者及其内部提取程序线程都包装为KafkaInputContext,它是为每个Map Task的记录读取器对象创建的。 然后,映射器接收最不利的消息对,解析日期的内容并发出(date,message),然后由Output Format拾取并在hdfs级别上分区到其他位置。 HadoopJob -> KafkaInputFormat -> zkUtils.getBrokerPartitions
kafka位移提交 什么是位移提交,定义。位移提交的分类自动提交和手动提交:同步提交和异步提交 什么是位移提交,定义。   Consumer需要向kafka汇报自己的位移数据,这个汇报过程称为位移提交。因为Consumer可以同时消费多个分区。所以位移提交是按照分区的粒度进行的。即Consumer需要为分配给他的每个分区提交各自的位移数据。   作用是:位移提交表示了Consumer的消费进度。这样当Consumer发生故障重启后,就可以读取之前提交的位移,然后从指定位移开始继续消费。类似于书签。
1.问题背景 某服务(用了SpringBoot + spring-kafka)处理Kafka消息时,发现每条消息处理时间长达60+秒。几百条消息处理完后,又重新从第一条开始重复消费。 2.原因分析 Kafka消费者有两个配置参数: max.poll.interval.ms 两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。 两次poll超过此时间间隔,Kafk...
1.自动提交: 1.属性enable.auto.commit -&amp;gt; true 2.属性auto.commit.interval.ms -&amp;gt;5000 默认提交时间间隔为5s 3.消费者会自动将poll()方法接收到的消息的最大偏移量提交上去 2.手动提交偏移量(分为两种) 1.同步的提交 2,异步的提交 3.同步方式提交偏移量 4.异步方式提交偏移量 假设一个分区中有10条消息,位移分别是0到9 其中consumer应用已消费了5条消息,就说明该consumer消费了位移为0到4的5条消息,此时Consumer位移是5,指向了下一条消息的位移。 consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets) Consumer能够同时消费多个分区的数据,所以位移的提交实际上是
一、日志简介 1.1 什么是日志 日志是带时间戳的基于时间序列的机器数据,包括IT系统信息(服务器、网络设备、操作系统、应用软件)、物联网各种传感器信息。日志反映用户行为,是事实数据,也是系统运维、故障诊断、性能分析的重要来源。对于任何系统,日志都是极其重要的组成部分。 1.2 日志处理的背景 随着大数据时代的来临,系统日志量也呈指数级增加。伴随着日志格式复杂度的增加、规模的扩大以及应用节点的增多...
工作中遇到过问题:包括数据Invalid Message和Failed_to_UNcompress等,会造成消费端的iterator损坏,导致消费进程挂掉,需要手动跳过某些数据; Kafka偏移量有保存在zookeeper和kafka中topic(_consumer_offset)2种方式; 1、修改保存在zookeeper中的偏移量: 使用./zkCli.sh -server xxxx...
一. 概述: 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。使用较多的消息队列有Kafka、ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ。 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。 1、异步处理:用户注册后,需要发注册邮件和注册短信。 2、应用解耦:用户下单后,订单系统需要通知库存系统。 3、流量削锋:秒杀活动,一般会因为流量过大,导致流量暴增,应
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值: bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2 test:0:1288 查询offset
Kafka 是一个高性能、分布式的消息队列,常用于处理大量的实时数据。Python-Kafka 是 Python 语言的 Kafka 客户端库,提供了丰富的 API 接口,可以方便地对 Kafka 进行操作。下面是一个 Python-Kafka 的实战案例: 1. 安装 Python-Kafka 库 使用 pip 安装 Python-Kafka 库: pip install kafka-python 2. 创建 Kafka 生产者 使用 Python-Kafka 库创建 Kafka 生产者,代码如下: ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092']) for i in range(10): producer.send('test', b'message {}'.format(i)) producer.close() 上述代码创建了一个 Kafka 生产者,并向名称为“test”的主题发送了 10 条消息。 3. 创建 Kafka 消费者 使用 Python-Kafka 库创建 Kafka 消费者,代码如下: ```python from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092']) for msg in consumer: print(msg) consumer.close() 上述代码创建了一个 Kafka 消费者,并订阅了名称为“test”的主题。当 Kafka 生产者向该主题发送消息时,消费者将接收到消息并打印出来。 4. 手动提交消费偏移量 默认情况下,Kafka 消费者会自动提交消费偏移量,但在某些情况下需要手动提交。例如,在消费者处理消息之前需要进行一些预处理或验证操作时,可以先手动提交偏移量,再进行处理。代码如下: ```python from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'], enable_auto_commit=False) for msg in consumer: print(msg) consumer.commit() consumer.close() 上述代码创建了一个 Kafka 消费者,并禁用了自动提交消费偏移量的功能。在每次处理完消息后,需要手动提交偏移量。 5. 多线程消费 在实际应用中,可能需要启用多个消费者线程来提高消息处理效率。可以使用 Python 的 threading 模块创建多个线程,每个线程创建一个 Kafka 消费者来消费消息。代码如下: ```python from kafka import KafkaConsumer import threading def consume(): consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092']) for msg in consumer: print(msg) consumer.close() threads = [] for i in range(4): t = threading.Thread(target=consume) threads.append(t) t.start() for t in threads: t.join() 上述代码创建了 4 个消费者线程,每个线程创建一个 Kafka 消费者并消费消息。这样可以提高消息处理效率。 以上就是一个简单的 Python-Kafka 实战案例,通过该案例可以了解如何使用 Python-Kafka 库创建 Kafka 生产者和消费者,以及如何手动提交消费偏移量和使用多线程消费。