@Autowired
private KafkaListenerEndpointRegistry registry;
//定时器,每天凌晨0点开启监听
@Scheduled(cron = "0 0 0 * * ?")
public void startListener() {
log.info("开启监听");
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer("11111").isRunning()) {
registry.getListenerContainer("11111").start();
registry.getListenerContainer("11111").resume();
//定时器,每天早上10点关闭监听
@Scheduled(cron = "0 0 10 * * ?")
public void shutDownListener() {
log.info("关闭监听");
registry.getListenerContainer("11111").pause();
@KafkaListener注解方法参数汇总
@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,那就要看你的kafka配置了。
@KafkaListener(....)
public void listen1(String data)
@KafkaListener(....)
public void listen2(ConsumerRecord<K,V> data)
@KafkaListener(....)
public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
@KafkaListener(....)
public void listen4(ConsumerRecord<K,V> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)
@KafkaListener(....)
public void listen5(List<String> data)
@KafkaListener(....)
public void listen6(List<ConsumerRecord<K,V>> data)
@KafkaListener(....)
public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
@KafkaListener(....)
public void listen8(List<ConsumerRecord<K,V>> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)
KafkaListenerContainerFactory配置
我们在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。
(1)所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。
@Autowired
private KafkaProperties properties;
(2)@KakfkaListener注解标记的监听实例对象,如不特殊指定,则默认使用我们在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。
为什么监听器实例对象会自动绑定到我们的配置文件呢?
因为它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。
请看作者给我们留的注释。如果不特殊指定,则默认的容器工厂将会被使用。
package org.springframework.kafka.annotation;
public @interface KafkaListener ...
* The bean name of the {@link
org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container
responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.
String containerFactory() default "";
默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。代码虽然长,但是很关键。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。
package org.springframework.boot.autoconfigure.kafka;
class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer =
new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
MessageConverter messageConverterToUse =
(this.properties.getListener().getType().equals(Type.BATCH))
? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
return configurer;
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() ->
new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
自定义容器工厂实例代码示例:
@Autowired
private KafkaProperties properties;
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
ConcurrentKafkaListenerContainerFactory<?, ?> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
stringObjectMap.put("enable.auto.commit", false);
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
//没有topic是否禁止系统启动
container.setMissingTopicsFatal(true);
container.setConcurrency(1);
//批量接收
container.setBatchListener(true);
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
container.getContainerProperties().setPollTimeout(5000);
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
//factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
return container;
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
LOGGER.info("4444收到消息" + list.size());
acknowledgment.acknowledge();
如下,这里我只列出了影响本例的几条参数。
spring:
kafka:
consumer:
enable-auto-commit: true
# max-poll-records: 20
listener:
ack-mode: batch
type: batch
concurrency: 5
如果我设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。
@KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
public void listen4(List<String> msgData) {
LOGGER.info("收到消息" + msgData);
@KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
public void listen5(List<String> msgData) {
LOGGER.info("收到消息" + msgData);
@Bean
public NewTopic newTopic() {
return new NewTopic(Constants.TOPIC, 8, (short) 1);
系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!
[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]
(1)concurrency设计的是多少,每个使用@KafkaListener的消费者实例就会创建多少个线程。当然了,最后创建的线程的线程可能没有分配到分区,所以就会一直闲置到系统中。
(2)设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。
这一顿分析下来,总算把我这些天的疑惑给解决了。之前只会简单使用Kafka,但是对这些参数与@KafkaListener注解的理解,还模糊不清。现在终于茅塞顿开了。
上述结论都是我加以实践与思考的得出的结论,对于Kafka小白来说或许有一定的参考价值。作为开发人员,希望自己未来在面对Kafka线上问题的时候,不再像之前一样不知所措、不明所以。