1.2 方法:

利用Spring的SpEl表达式,将topics 配置为:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")

2 特殊topic解析处理

如果需要处理特殊情况,可以定义处理类:

package com.server.util;
public class Tool {
    public static List<String> splitToList(String input, String regex) {
        List<String> array = Arrays.asList(input.split(regex));
        //随便处理array内部数据
        return array;
@KafkaListener(topics = "#{T(com.server.util.Tool).splitToList('${kafka.topics}', ',')}")

3 参考:

springboot+kafka中@KafkaListener如何动态指定多个topic_Forward233的博客-CSDN博客_kafka监听多个topic

@KafkaListener通过配置加载多个topic_傻大喵的博客-CSDN博客_kafka监听多个topic

springboot+kafka中@KafkaListener如何动态指定多个topic 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic。 运行程序,console打印的效果如下: 因为只开了一条消费者线程,所...
topics: "admin,login,client" @KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}") 2:在配置类中获取 @GetMapping("/kafka/many_consumer") public void manyConsumer() { for (int i = 0; i <1; i++) { try { Thread.sleep(3); } catch (InterruptedException ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @Messa
       接到领导的一个需求,希望封装一下kafka的消费者,可以从配置读取topic进行消费;一开始首先想到的是用java kafka的高阶api手工根据topic创建消费者,一个topic创建一个消费者,依赖zookeeper完成kafka内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。        调研了一下,手工实现类似sp...
public class KafkaConfig implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { String topicName = wireTopics(); Sys...
spring boot 在集成kafka 消费端使用@KafkaListener时候要指定topic,在实际应用时候,可能需要通过配置来指定topic。 首先写一个KafkaTopicConfig类 @Configuration public class KafkaTopicConfig implements InitializingBean { @Value("${kafka.topic}") private String topic; @Override public v
@KafkaListener 是一个非常方便的注解,可以用于将一个方法标记为 Kafka 消息监听器。以下是使用 @KafkaListener 的步骤: 1. 添加依赖:在 pom.xml 文件中添加以下依赖: ```xml <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.7.RELEASE</version> </dependency> 2. 配置 Kafka:在 application.properties 文件中配置 Kafka 的相关信息,如下所示: spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group 3. 创建监听器方法:创建一个方法,用于处理从 Kafka 主题中接收到的消息。 ```java @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { System.out.println("Received message: " + message); 4. 启动应用程序:启动 Spring Boot 应用程序,@KafkaListener 注解将自动注册为 Kafka 消息监听器。 @KafkaListener 注解还支持其他参数,例如指定消息的反序列化器、消息的分区等。更多详细信息可以参考 Spring Kafka 的官方文档。