1.2 方法:
利用Spring的SpEl表达式,将topics 配置为:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
2 特殊topic解析处理
如果需要处理特殊情况,可以定义处理类:
package com.server.util;
public class Tool {
public static List<String> splitToList(String input, String regex) {
List<String> array = Arrays.asList(input.split(regex));
//随便处理array内部数据
return array;
@KafkaListener(topics = "#{T(com.server.util.Tool).splitToList('${kafka.topics}', ',')}")
3 参考:
springboot+kafka中@KafkaListener如何动态指定多个topic_Forward233的博客-CSDN博客_kafka监听多个topic
@KafkaListener通过配置加载多个topic_傻大喵的博客-CSDN博客_kafka监听多个topic
springboot+kafka中@KafkaListener如何动态指定多个topic
本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener
首先,application.properties中配置用逗号隔开的多个topic。
运行程序,console打印的效果如下:
因为只开了一条消费者线程,所...
topics: "admin,login,client"
@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
2:在配置类中获取
@GetMapping("/kafka/many_consumer")
public void manyConsumer() {
for (int i = 0; i <1; i++) {
try {
Thread.sleep(3);
} catch (InterruptedException
ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Messa
接到领导的一个需求,希望封装一下kafka的消费者,可以从配置读取topic进行消费;一开始首先想到的是用java kafka的高阶api手工根据topic创建消费者,一个topic创建一个消费者,依赖zookeeper完成kafka内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。
调研了一下,手工实现类似sp...
public class KafkaConfig implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
String topicName = wireTopics();
Sys...
spring boot 在集成kafka 消费端使用@KafkaListener时候要指定topic,在实际应用时候,可能需要通过配置来指定topic。
首先写一个KafkaTopicConfig类
@Configuration
public class KafkaTopicConfig implements InitializingBean {
@Value("${kafka.topic}")
private String topic;
@Override
public v
@KafkaListener 是一个非常方便的注解,可以用于将一个方法标记为 Kafka 消息监听器。以下是使用 @KafkaListener 的步骤:
1. 添加依赖:在 pom.xml 文件中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.7.RELEASE</version>
</dependency>
2. 配置 Kafka:在 application.properties 文件中配置 Kafka 的相关信息,如下所示:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
3. 创建监听器方法:创建一个方法,用于处理从 Kafka 主题中接收到的消息。
```java
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
4. 启动应用程序:启动 Spring Boot 应用程序,@KafkaListener 注解将自动注册为 Kafka 消息监听器。
@KafkaListener 注解还支持其他参数,例如指定消息的反序列化器、消息的分区等。更多详细信息可以参考 Spring Kafka 的官方文档。