Kafka
是一个常见的分布式消息队列,常用于大规模数据流式处理。在使用
Kafka
时,一个常见的问题是:消费者消费了消息后,消息是否会从
Kafka
中移除?
在
Kafka
中,消息是以分区(Partition)的形式被保存。消息在分区中是有序且不可变的,消费者消费消息时,实际上是从分区中读取消息。消费者消费消息后,
Kafka
并不会立即将消息从分区中移除,而是将已消费的消息的偏移量(Offset)记录在
消费者
组(Consumer Group)中,以便
消费者
组中的其他
消费者
继续消费该分区中的消息时,不会消费已经被其他
消费者
消费过的
消息
。
如果一个
消费者
组中的所有
消费者
都已经消费了一个分区中的所有消息,
Kafka
会将该分区的偏移量重置为0,此时该分区中的所有消息都可以被重新消费。如果某些
消费者
在该分区中没有消费完所有消息就退出了
消费者
组,后来加入该
消费者
组的
消费者
仍然可以继续消费该分区中未消费的
消息
。
下面是一个示例代码,展示了如果一个
消费者
组中的所有
消费者
都已经消费了一个分区中的所有
消息
,重置偏移量为0的过程:
import org.apache.kafka.clients.consumer.*;
import java.util.*;
public class KafkaConsumerExample {
private static String topic = "test-topic";
private static String group = "test-group";
private static String brokers = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer Group: %s, Partition: %d, Offset: