kafka在springboot环境下多线程请求和多线程消费

1.需求描述:

接到一个需求,A模块将某些渠道获取的数据发送到kafka,B模块从kafka消费数据,设置的主题是r2p5,即设置了5个分区,为了消费速度最大化,代码中设置了五个线程

开发完生产者的代码如下:

//pom.xml引入kafka配置包
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
   <!--此处的版本号依赖父版本,省略-->
</dependency>
//java代码
@RestController
public class TestController {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @RequestMapping("/test")
    public void test(){
        kafkaTemplate.send("topicName","msg");

消费者代码如下(配置类是百度的,因为代码在内网,单敲麻烦):

1)配置类

@Configuration @EnableKafka public class KafkaConsumerConfig { final static String list ="10.28.18.103:6667"; * Description:获取配置 * Date: 2017年7月11日 * @author shaqf private Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props)); return props; /** 获取工厂 */ private ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory(consumerConfigs()); /** 获取实例 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory(); factory1.setConsumerFactory(consumerFactory()); factory1.setConcurrency(5); factory1.getContainerProperties().setPollTimeout(3000); System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1)); return factory1;
  • 消费者java代码
  • @Controller
    public class ConsumerController {
        @KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
        public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
           //获取数据逻辑处理
    

    2.问题描述

    用postman单个线程启用发送批量数据的情况下数据正常,但是使用jmeter启用五个线程发送批量数据的情况下会出现消费数据丢失的问题,经过验证排除了生产者丢数据的可能,而zookeeper是很久以前部署的公司测试环境服务器,理论上不会出问题,经过查证,kafka的消费者本身是线程不安全的,需要对消费者做下处理,个人在网上copy多线程代码时出现了一些问题,记录下:

    参考的博主地址(2条消息) springboot集成kafka多线程定时消费_weixin_40510917的博客-CSDN博客_kafka定时消费

    第二版的消费者处理类:

    package com.example.demo_kafka;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    @Component
    public class ConsumerHandler {
        //kafka消费对象
        private KafkaConsumer<Object, Object> consumer;
        //线程池对象
        private ExecutorService executors;
        //kafka属性配置()
        public static Properties initConfig() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "172.16.1.240:9092");
            props.put("group.id", "test01");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //下面两个参数是我新加的,不加的情况下会报错,报错见第三小节
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
            return props;
        //初始化kafka连接
        @PostConstruct
        public void initKafkaConfig() {
            Properties properties = initConfig();
            consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singleton("test001"));
         * 多线程消费kafka数据
         * @param workerNum
        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            while (true) {
                if(consumer!=null){
                     ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
                if (!consumerRecords.isEmpty()) {
                    for (final ConsumerRecord record : consumerRecords) {
                        executors.submit(new Worker(record));
                        commitOffset();
                }else{
                    Properties props =initConfig();
                    consumer = new KafkaConsumer<>(properties);
                    consumer.subscribe(Collections.singleton("test001"));
        private void commitOffset(){
                consumer.commitAsync();
            }catch(Exception E){
                consumer.commitSync();
    
    package com.netintech.kafka.impl;
    import com.alibaba.fastjson.JSONObject;
    import com.netintech.kafka.bean.Test;
    import com.netintech.kafka.service.TestService;
    import com.netintech.kafka.task.SendVehicleInfo;
    import com.netintech.kafka.utils.SpringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.transaction.annotation.Transactional;
     * 多线程kafka消费类
    public class OneWork implements Runnable {
          //日志类
          private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
          private ConsumerRecord<String, String> consumerRecord;
          public OneWork(ConsumerRecord record) {
             this.consumerRecord = record;
          @Override
          public void run() {
                      //执行自己的逻辑
                      //todo
                }catch (Exception e){
                      LOG.info("异常错误信息:"+e.getMessage());
    

    实际调用使用的是定时器

    @Controller
    @EnableScheduling
    public class ConController{
        @Autowired
        private ConsumerHandler consumers;
        @Scheduled(corn="${work.start:0/1 * * * * ?}")
        public void listen(){
            consumer.execute(5);//这个5参数在实际中我并没有使用,而是在代码中写死,如果需要可以配置。
    

    3.报错(没加两个参数之前)

    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
    a retriable exception.You should retry commiting the lastest consumed offsets.
    Caused by: org.apache.kafka.common.errors.DisconnectException:null
    

    加上参数之后,从日志来看没有报错,数据也没有丢失,但是真正入库是否可行还有待验证,此外,报错的原因以及为什么加上两个参数就解决问题,我其实并不理解,有待考证