@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
TODO…
说明从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;可以使用#{…}或属性占位符(${…})在SpEL上配置注释上的大多数属性。比如: @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。
通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:
下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。
三、指定Topic分区
spring-kafka使用基于@KafkaListener注解,@KafkaListener使用方式如下
@KafkaListener(topics = "topic1")
public void kafkaListen(List<ConsumerRecord<xxx, xxx>> records) {
在注解内指定topic名称,当对应的topic内有新的消息时,testListen方法会被调用,...
Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。因此,也越来越多的框架对 kafka 做了集成,比如本文将要说到的 spring-kafka。
Kafka 既然作为一个消息发布订阅系统,就包括消息生成者和消息消费者。本文主要讲述的 spring-kafka 框架的 kafkaListener 注解的深入解读和使用案例。
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.
最近在接手某个项目代码时,发现关于Kafka的consumer相关的代码写的很乱,consumer中写了大量的配置的代码,并且手动的拉取消息,并开启线程消费,不够优雅;
理想的做法是单独维护kafka的consumer配置,在定义consumer的bean时,指定topic和group,仅实现消费逻辑;
从kafka-clients的2.2.4版本开始,可以直接使用@KafkaListener注解来标记消费者,注解的属性将覆盖在消费者者工厂中配置的具有相同名称的所有属性,下面介绍使用方法;.........
从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。
可以使用#{…}或属性占位符(${…})在SpEL上配置注释上的大多数属性。
@KafkaListe...
@KafkaListener(id = "layer_test_consumer", topics = {"${kafka.consumer.topic.layerTestConfig}"},
groupId = "${kafka.consumer.group-id.layerTestConfig}", containerFactory = "batchContainerFactory", errorHandler = "consumerAwareListenerErrorHan..
@KafkaListener(topics = {”demo_topic_01“})
即可实现对该topic的监听
我们知道,kafka的consumer端通过从broker poll消息,然后处理
我们通过正向理解 + 结果反推 来分析
代码最终肯定会调用KafkaConsumer的poll()方法,可以通过方法栈来确定调用来源
查看poll...
Spring-Kafka整合是将Spring框架与Kafka消息系统进行整合,使得开发者能够方便地使用Spring框架进行Kafka消息的生产和消费。
Spring-Kafka整合提供了以下功能:
1. 自动配置Kafka生产者和消费者。
2. 提供KafkaTemplate用于发送消息。
3. 提供@KafkaListener注解用于监听Kafka主题。
4. 提供KafkaListenerContainerFactory用于创建Kafka监听器容器。
5. 提供KafkaAdmin用于管理Kafka集群。
Spring-Kafka整合的使用步骤如下:
1. 添加Spring-Kafka依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
2. 配置Kafka连接
在application.properties文件中添加Kafka连接相关配置:
spring.kafka.bootstrap-servers=localhost:9092
3. 编写Kafka生产者
使用KafkaTemplate发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
4. 编写Kafka消费者
使用@KafkaListener注解监听Kafka主题:
@KafkaListener(topics = "test-topic")
public void receiveMessage(String message) {
//消费消息
5. 配置Kafka监听器容器
使用KafkaListenerContainerFactory创建Kafka监听器容器:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
6. 配置Kafka管理器
使用KafkaAdmin创建Kafka管理器:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
Spring-Kafka整合的使用可以使得开发者更加方便地使用Kafka消息系统,提高消息的生产和消费效率。