kafka动态指定主题与分组ID

近期项目新做了一个环境,采购了阿里的CLB进行四层代理的负载均衡,每个服务都搭建了两个实例,后来测试过程中,遇到了一个问题,推送服务有时没数据

两个相同实例,都作为kafka监听者,存在只有一个实例消费到数据,之前设置了GroupId,但是集群环境下还是会存在组内消费竞争的问题。

请注意自己的业务场景!
请注意自己的业务场景!
请注意自己的业务场景!

1.配置kafka工厂

注意:此时应将groupid写入yml或properties配置文件

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Value("${group.android}")
    private String tcpAndroid;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory4WS() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory4WS());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    public Map<String, Object> getCommonPropertis() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    public ConsumerFactory<String, String> consumerFactory4WS() {
        Map<String, Object> properties = getCommonPropertis();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
        return new DefaultKafkaConsumerFactory<>(properties);

2.kafka监听者配置

@KafkaListener(topics = {"${topic.msg}"}, containerFactory = "kafkaListenerContainerFactory4WS")

此时,kafka可以指定工厂,名字在第1步已经配置好了

如果你的业务场景需要每个实例都需要数据,可以采用这个方式。 因为我们是推送业务,必须保证每个实例都能消费到数据,所以,你懂的。

kafka动态指定主题与分组ID原因近期项目新做了一个环境,采购了阿里的CLB进行四层代理的负载均衡,每个服务都搭建了两个实例,后来测试过程中,遇到了一个问题,推送服务有时没数据问题描述两个相同实例,都作为kafka监听者,存在只有一个实例消费到数据,之前设置了GroupId,但是集群环境下还是会存在组内消费竞争的问题。注意请注意自己的业务场景!请注意自己的业务场景!请注意自己的业务场景!解决方案1.配置kafka工厂注意:此时应将groupid写入yml或properties配置文 在没有配置的情况下,group.id是自动生成的,如果想要人为定义,如下 cd /home/kafka/software/kafka_2.10-0.9.0.1/config cp consumer.properties consumer1.properties 修改group.id内容为自己指定group.id,如下 vim consumer1.properties 启动消费者时指定group.id的配置文件 ./bin/kafka-console-consumer.sh --zookeep @KafkaListener是kafka消费者,topics是其主题名,groupId是组名; 属性值一般只支持常量,再集群情况下,topics、groupId如果不是动态的,那集群环境中只有一台能消费主题上的任务; 2.动态指定topics、groupId两个属性 @KafkaListener中有一个beanRef属性,专门获取spring容器中的bean; beanRef:此注释中的SpEL表达式中使用的伪bean名称,用于引用定义此侦听器的当前.
topicgroup质检是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。 不使用group的话,启动10个consumer消费一个topic,这10个consumer都能得到topic的所有数据,相当于这个topic中的任一条消息被消费10次。 使用group的话,连接时带上groupidtopic的...
有人说世界上有三个伟大的发明:火,轮子,以及 Kafka。 发展到现在,Apache Kafka 无疑是很成功的,Confluent 公司曾表示世界五百强中有三分之一的企业在使用 Kafka。今天便和大家分享一下 Kafka 相关知识点,高性能、持久化、多副本备份、横向扩展...... 万字长文,做好准备,建议先收藏再看! 1、为什么有消息系统 1. 解耦合 2. 异步处理 例如电商平台,秒杀活动。一般流程会分为:1:风险控制、2:库存锁定、3:生成订单、4:短信通知、5:.
producer:消息生产者,就是向kafka broker发消息的客户端; consumer:消息消费者,向kafka broker取消息的客户端; Consumer Group消费者组,由多个consumer组成,消费者组内的每个消费者负责消费不同分区的数据,一个分区只能有一个组内消费消费消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者; broker...
topicgroup质检是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。 不使用group的话,启动10个consumer消费一个topic,这10个consumer都能得到topic的所有数据,相当于这个topic中的任一条消息被消费10次。 使用group的话,连接时带上groupidtopic的消息会分发到10个consumer上,每条消息只被消费1次。
前几天为了省事,在申请group的时候,就使用了原来的group,本来以为group从属于某一个topictopic不同,group之间相互不会影响,但实际情况不是这样的。 kafka不同topic的consumer如果用的groupid名字一样的情况下,其中任意一个topic的consumer重新上下线都会造成剩余所有的consumer产生reblance行为, 即使大家不是同一个topi...