Spring for Apache Kafka
https://spring.io/projects/spring-kafka

Spring Kafka 2.x 生产/消费消息的基本用法详解
https://juejin.im/entry/5a9e65296fb9a028dc408af5

Spring-Kafka 快速接入配置

添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

spring.kafka 配置

简单接入只需以下核心配置:

  • kafka服务器地址
  • 序列化、反序列化器
  • 消息体所在包路径加入 trusted packages
  • spring:
      kafka:
        bootstrap-servers: localhost:9092  # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
        consumer:
          group-id: group-my
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
            spring.json.trusted.packages: "com.masikkk.*"
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    

    发送消息

    注入 KafkaTemplate 即可发送消息

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class KafkaProducer {
        private final KafkaTemplate<String, Object> kafkaTemplate;
        public void publish(Message message) {
            kafkaTemplate.send("masikkk-topic", message.getPartitionKey(), message);
    

    接收消息

    @KafkaListener 注解方法即可接收消息

    @Slf4j
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = {"topic1", "topic2"}, concurrency = "12")
        public void handle(MessageVO messageVO) {
            log.info("接收到消息: {}", JsonMappers.Normal.toJson(message));
    

    Spring-Kafka 配置

    Spring-Kafka 生产者默认配置

    kafka-clients 包的 org.apache.kafka.clients.producer.ProducerConfig 中有配置项解释,static 代码块中有默认值。

    retries: 3 # 大于0时启动失败重试,发送失败时重试发送的次数。默认值 2147483647
    acks: 1 # 0-不应答。1-leader 应答,all-所有 leader 和 follower 应答。默认值 1

    Spring Boot 启动时日志中打出的生产者默认配置:

    2022-08-23 09:44:11.744 [http-nio-8778-exec-8] INFO  org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
            acks = 1
            batch.size = 16384
            bootstrap.servers = [kafka-service:9092]
            buffer.memory = 33554432
            client.dns.lookup = use_all_dns_ips
            client.id = producer-1
            compression.type = none
            connections.max.idle.ms = 540000
            delivery.timeout.ms = 120000
            enable.idempotence = false
            interceptor.classes = []
            internal.auto.downgrade.txn.commit = true
            key.serializer = class org.apache.kafka.common.serialization.StringSerializer
            linger.ms = 0
            max.block.ms = 60000
            max.in.flight.requests.per.connection = 5
            max.request.size = 1048576
            metadata.max.age.ms = 300000
            metadata.max.idle.ms = 300000
            metric.reporters = []
            metrics.num.samples = 2
            metrics.recording.level = INFO
            metrics.sample.window.ms = 30000
            partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
            receive.buffer.bytes = 32768
            reconnect.backoff.max.ms = 1000
            reconnect.backoff.ms = 50
            request.timeout.ms = 30000
            retries = 2147483647
            retry.backoff.ms = 100
            sasl.client.callback.handler.class = null
            sasl.jaas.config = null
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.min.time.before.relogin = 60000
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            sasl.kerberos.ticket.renew.window.factor = 0.8
            sasl.login.callback.handler.class = null
            sasl.login.class = null
            sasl.login.refresh.buffer.seconds = 300
            sasl.login.refresh.min.period.seconds = 60
            sasl.login.refresh.window.factor = 0.8
            sasl.login.refresh.window.jitter = 0.05
            sasl.mechanism = GSSAPI
            security.protocol = PLAINTEXT
            security.providers = null
            send.buffer.bytes = 131072
            socket.connection.setup.timeout.max.ms = 127000
            socket.connection.setup.timeout.ms = 10000
            ssl.cipher.suites = null
            ssl.enabled.protocols = [TLSv1.2]
            ssl.endpoint.identification.algorithm = https
            ssl.engine.factory.class = null
            ssl.key.password = null
            ssl.keymanager.algorithm = SunX509
            ssl.keystore.certificate.chain = null
            ssl.keystore.key = null
            ssl.keystore.location = null
            ssl.keystore.password = null
            ssl.keystore.type = JKS
            ssl.protocol = TLSv1.2
            ssl.provider = null
            ssl.secure.random.implementation = null
            ssl.trustmanager.algorithm = PKIX
            ssl.truststore.certificates = null
            ssl.truststore.location = null
            ssl.truststore.password = null
            ssl.truststore.type = JKS
            transaction.timeout.ms = 60000
            transactional.id = null
            value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
    

    Spring-Kafka 消费者默认配置

    kafka-clients 包的 org.apache.kafka.clients.consumer.ConsumerConfig 中有配置项解释,static 代码块中有默认值。

    Spring Boot 启动时日志中打出的消费者默认配置:

    2022-08-23 09:43:23.370 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
            allow.auto.create.topics = true
            auto.commit.interval.ms = 5000
            auto.offset.reset = latest
            bootstrap.servers = [kafka-service:9092]
            check.crcs = true
            client.dns.lookup = use_all_dns_ips
            client.id = consumer-group-mc-1
            client.rack =
            connections.max.idle.ms = 540000
            default.api.timeout.ms = 60000
            enable.auto.commit = false
            exclude.internal.topics = true
            fetch.max.bytes = 52428800
            fetch.max.wait.ms = 500
            fetch.min.bytes = 1
            group.id = group-mc
            group.instance.id = null
            heartbeat.interval.ms = 3000
            interceptor.classes = []
            internal.leave.group.on.close = true
            internal.throw.on.fetch.stable.offset.unsupported = false
            isolation.level = read_uncommitted
            key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
            max.partition.fetch.bytes = 1048576
            max.poll.interval.ms = 300000
            max.poll.records = 500
            metadata.max.age.ms = 300000
            metric.reporters = []
            metrics.num.samples = 2
            metrics.recording.level = INFO
            metrics.sample.window.ms = 30000
            partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
            receive.buffer.bytes = 65536
            reconnect.backoff.max.ms = 1000
            reconnect.backoff.ms = 50
            request.timeout.ms = 30000
            retry.backoff.ms = 100
            sasl.client.callback.handler.class = null
            sasl.jaas.config = null
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.min.time.before.relogin = 60000
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            sasl.kerberos.ticket.renew.window.factor = 0.8
            sasl.login.callback.handler.class = null
            sasl.login.class = null
            sasl.login.refresh.buffer.seconds = 300
            sasl.login.refresh.min.period.seconds = 60
            sasl.login.refresh.window.factor = 0.8
            sasl.login.refresh.window.jitter = 0.05
            sasl.mechanism = GSSAPI
            security.protocol = PLAINTEXT
            security.providers = null
            send.buffer.bytes = 131072
            session.timeout.ms = 10000
            socket.connection.setup.timeout.max.ms = 127000
            socket.connection.setup.timeout.ms = 10000
            ssl.cipher.suites = null
            ssl.enabled.protocols = [TLSv1.2]
            ssl.endpoint.identification.algorithm = https
            ssl.engine.factory.class = null
            ssl.key.password = null
            ssl.keymanager.algorithm = SunX509
            ssl.keystore.certificate.chain = null
            ssl.keystore.key = null
            ssl.keystore.location = null
            ssl.keystore.password = null
            ssl.keystore.type = JKS
            ssl.protocol = TLSv1.2
            ssl.provider = null
            ssl.secure.random.implementation = null
            ssl.trustmanager.algorithm = PKIX
            ssl.truststore.certificates = null
            ssl.truststore.location = null
            ssl.truststore.password = null
            ssl.truststore.type = JKS
            value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
    

    获取 KafkaProperties

    在任意 Component 内直接注入 KafkaProperties 即可获取 Spring-Kafka 的配置参数,如果配置了 Spring-Kafka 则自动会有这个配置类 Bean
    例如:

    @Configuration
    public class KafkaConfiguration {
        @Autowired
        private KafkaProperties kafkaProperties;
    

    并发消费

    @KafkaListener 中增加 concurrency = "12"
    或者配置

    spring:
      kafka:
        listener:
          # 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
          concurrency: 12
    

    使用 ConsumerRecord 类消费

    用 ConsumerRecord 类接收消息的好处:
    ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。

    @KafkaListener(topics = "xxxx")
    public void consumerListener(ConsumerRecord<Integer, String> record) {
        log.info("receive : " + record.toString());
    

    自动创建/更新topic

    kafka默认自动创建topic

    kafka 默认就是自动创建 topic 的,在 kafka server.properties 配置中
    auto.create.topics.enable 默认为 true,表示如果主题不存在,则自动创建主题,分区数量由 kafka server.properties 配置文件中 num.partitions 指定,默认是 1
    num.partitions 默认值1,如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默认数值。

    Spring 启动时暴露 NewTopic Bean创建/更新topic

    Spring 启动时自动创建topic
    如果topic已存在,但分区或副本数不同,可自动修改副本数,但只能改大不能改小

    @Configuration
    public class KafkaConfiguration {
        @Bean
        public NewTopic newTopic() {
            // 创建topic,分区数12,副本数1
            return new NewTopic("my-topic", 12, (short) 1);
    

    批量将 NewTopic Bean 注入 context 创建topic

    下面的博客里提供一种配置多个topic,循环往 GenericWebApplicationContext 注册 NewTopic bean 来创建topic的方式,topic很多时可以用下面这种:

    Springboot+kafka创建topic
    https://juejin.cn/post/7071055307094360100

    异常处理

    处理消费异常

    1、重写 ConsumerAwareListenerErrorHandler 其中处理消费异常的消息

    @Slf4j
    @Configuration
    public class KafkaConfiguration {
        @Bean
        public ConsumerAwareListenerErrorHandler listenerErrorHandler() {
            return (message, exception, consumer) -> {
                log.error("Kafka消费异常 {}", message.getPayload(), exception);
                return null;
    

    2、配置到 @KafkaListener 的 errorHandler 中,则 handle 方法排除异常时会进入 ConsumerAwareListenerErrorHandler 内

    @KafkaListener(topics = {"topic1", "topic2"}, errorHandler = "listenerErrorHandler")
    public void handle(MessageVO messageVO) {
    

    ErrorHandlingDeserializer 处理毒丸消息

    ConsumerAwareListenerErrorHandler 只能处理消息消费异常,如果消息反序列化失败,根本没进入 @KafkaListener 方法则无法处理。
    如果 kafka consumer 反序列化失败,就会出现毒丸(Poison Pill)现象,Consumer 会卡在“反序列化失败-重试-反序列化失败”的死循环中,无法进入 @KafkaListener 方法,无法再处理后续消息,疯狂占用cpu、io,很短时间内会打印几十G的日志占用磁盘,危害很大。

    问题:
    kafka consumer 反序列化失败,疯狂打印日志:

    2022-08-23 18:25:25.370 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition alert-push-topic-5.4-0 at offset 0. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    

    解决:
    使用 ErrorHandlingDeserializer 处理反序列化失败,将 Consumer 的 key-deserializer 和 value-deserializer 都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    并委任具体的 Key 和 Value 反序列化器:

    spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    

    在 Key 或 Value 反序列化失败时, ErrorHandlingDeserializer 确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset 可以向前移动,使得 Consumer 可以继续处理后续的消息。

    ErrorHandlingDeserializer 内部先用 delegate 反序列化,遇到异常会捕获处理:

    public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
        @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            try {
                return this.delegate.deserialize(topic, headers, data);
            catch (Exception e) {
                deserializationException(headers, data, e);
                return recoverFromSupplier(topic, headers, data, e);
    

    完整配置:

    spring:
      kafka:
        consumer:
          group-id: group-my
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          properties:
            spring.json.trusted.packages: "com.masikkk.*"
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        listener:
          concurrency: 12
    

    AckMode 消息确认

    RECORD:每处理一条commit一次
    BATCH:(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率
    TIME:每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
    COUNT:累积达到ackCount次的ack去commit
    COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
    MANUAL:listener负责ack,但是背后也是批量上去
    MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit

    聊聊spring for kafka的AckMode
    https://juejin.im/post/59e0528df265da43133c2ab5

    spring-kafka 的自动commit offset机制

    enable.auto.commit 设为false时,会使用spring-kafka的自动提交offset机制。
    enable.auto.commit 设为true时采用kafka的默认提交模式。

    spring-kafka 会检查 enable.auto.commit 变量是否为false,当为false时,
    spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
    然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
    然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。

    建议:
    使用spring-kafka后,把kafka-client的enable.auto.commit设置成false,表示禁止kafka-client自动提交offset,从而转向使用spring-kafka的offset提交机制。
    之前遇到过消费超时自动提交失败,导致offset永远没更新,spring-kafka提供了多种提交策略,保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。

    针对spring-kafka的consumer端上的使用分析总结
    https://juejin.im/entry/5a6e8dea518825732472710c

    总结kafka的consumer消费能力很低的情况下的处理方案
    https://www.jianshu.com/p/4e00dff97f39

    手动配置 KafkaListenerContainerFactory

    在创建 监听容器 ListenerContainer 前需要创建一个 监听容器工厂 ListenerContainerFactory,Spring 默认会根据 application.properties/yml 中的 spring.kafka 配置给配置好一个 ListenerContainerFactory

    配置项控制是否开启kafka连接

    问题:
    配置 Spring-Kafka 后,服务启动就会自动连接kafka,连不上会一直报 warning,虽然不影响服务

    2022-08-24 15:38:50.437 [kafka-admin-client-thread | adminclient-1] WARN  org.apache.kafka.clients.NetworkClient.processDisconnection:782 - [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.
    

    自定义配置 KafkaListenerContainerFactory 实现 app.kafkaEnabled=true 时才开启 kafka 监听。
    如果环境中没有启动 kafka 服务器,设置 app.kafkaEnabled=false 不连接 kafka broker,不会由于连不上 kafka 而一直报 warning

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    @ConditionalOnProperty(prefix = "app", name = "kafkaEnabled", havingValue = "true")
    public class KafkaConfiguration {
        private final KafkaProperties kafkaProperties;
        public static final String CONTAINER_NAME = "my-container";
        @Bean(CONTAINER_NAME)
        public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
            return factory;
        @Bean
        public KafkaConsumer kafkaConsumer() {
            return new KafkaConsumer();
    

    如果在 @KafkaListener 属性中没有指定 containerFactory 那么 Spring Boot 会默认注入 name 为 kafkaListenerContainerFactory 的 containerFactory

    public class KafkaConsumer {
        @KafkaListener(topics = {"topic1"}, containerFactory = KafkaConfiguration.CONTAINER_NAME)
        public void handle(Message message) {
    

    Spring-kafka/Apache-kafka/Kafka-clients版本兼容性

    spring-kafka 与对应的 Apache-kafka kafka-clients 版本兼容性对照表

    Spring-kafka Apache-Kafka kafka-clients 2.3.x 3.2.x 2.3.1 2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x 2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0 1.3.x 2.3.x 0.11.0.x, 1.0.x

    我们用的版本

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.3.1.RELEASE</version>
    </dependency>
    

    Spring KafkaTemplate 配置类示例

    @EnableKafka
    @Configuration
    public class KafkaConfiguration {
        @Value("${kafka.bootstrap.servers}")
        private String brokers;
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return props;
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
    

    问题

    The class ‘com.xxx.Message’ is not in the trusted packages

    问题:kafka 消费报错:

    2022-08-22 18:41:12.194 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition structured-data-push-topic-5 at offset 0. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.IllegalArgumentException: The class 'com.xxx.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
        at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
    

    解决:
    把消息体 VO 所在的包加入信任包配置:

    spring:
      kafka:
        consumer:
          properties:
            spring.json.trusted.packages: "com.masikkk.*"
    

    https://stackoverflow.com/questions/51688924/spring-kafka-the-class-is-not-in-the-trusted-packages

    动态创建消费者

    kafka动态创建消费者(实时更新topic和servers)
    https://blog.csdn.net/weixin_41422086/article/details/104849127

    1. Spring-Kafka 快速接入配置
      1. 添加依赖
      2. spring.kafka 配置
      3. 发送消息
      4. 接收消息
    2. Spring-Kafka 配置
      1. Spring-Kafka 生产者默认配置
      2. Spring-Kafka 消费者默认配置
      3. 获取 KafkaProperties
      4. 并发消费
      5. 使用 ConsumerRecord 类消费
      6. 自动创建/更新topic
        1. kafka默认自动创建topic
        2. Spring 启动时暴露 NewTopic Bean创建/更新topic
        3. 批量将 NewTopic Bean 注入 context 创建topic
      7. 异常处理
        1. 处理消费异常
        2. ErrorHandlingDeserializer 处理毒丸消息
      8. AckMode 消息确认
      9. spring-kafka 的自动commit offset机制
      10. 手动配置 KafkaListenerContainerFactory
        1. 配置项控制是否开启kafka连接
    3. Spring-kafka/Apache-kafka/Kafka-clients版本兼容性
    4. Spring KafkaTemplate 配置类示例
    5. 问题
      1. The class ‘com.xxx.Message’ is not in the trusted packages
    6. 动态创建消费者