本文内容为在工作中解决@KafkaListener注解需要动态指定参数内容时的解决方案。方案内容可能不是最佳,如有建议还请指正。

@KafkaListener动态指定参数内容

先还原一下问题场景。出现这个问题的原因是现在有若干集群在对于kafka消息进行消费。在不通过外部配置文件处理的情况下,现在想让集群中的每个服务都能够同时消费消息。因此我们很容易想到通过指定不同的groupId进行消费者组区分。当每个服务处于不同的消费者组时,即可同时消费消息。而由于每个服务的配置文件相同,因此只能通过动态的方式指定每个服务的消费者组id。而kafkaListener中需要配置好,解决问题的关键也就是如何把配置的内容变为动态的内容。

对于内容本身,这里采用随机数+时间戳的方式来保证消费者组id不相同。而数据又如何写进去呢?直接计算肯定不行,因此这里用到了另一个配置文件。在配置文件中拼接我们所需的数据,并将它写入到系统配置中。

@Configuration
public class KafkaGroupConfig implements InitializingBean {
    @Override
    public void afterPropertiesSet() throws Exception {
//        groupId 拼接
        String groupId = "kafka-groupId-" + Math.random() + System.currentTimeMillis();
//        系统写入
        System.setProperty("kafka-groupId",groupId);

那么我们如何在注解中使用上述已经设置好的变量呢?由于变量有依赖关系,因此首先需要在使用注解的类上进行bean顺序指定:@DependsOn(value = "kafkaGroupConfig")。而与此同时,在注解上也要做相应处理,改变获取groupId的方式:

@KafkaListener(topics = "${spring.kafka.topic_id}",groupId = "${kafka-groupId}")
public void listen (ConsumerRecord<?, ?> record, Consumer consumer) {
    logger.info("topic = {}, offset = {}, value = {}", record.topic(), record.offset(), record.value());
    Boolean processResult = process((String) record.value());
    if(processResult){
        consumer.commitAsync();
    }else {
        logger.error("topic = {}, offset = {}, value = {} 处理错误", record.topic(), record.offset(), record.value());

根据上述思路,即完成了参数内容的动态指定。而采取同样的方式,也可以完成从外部存储中动态获取参数的方式,提高了注解的灵活性。而如下图所示,groupId已经变成我们动态指定的内容。