kafka在springboot环境下多线程请求和多线程消费
1.需求描述:
接到一个需求,A模块将某些渠道获取的数据发送到kafka,B模块从kafka消费数据,设置的主题是r2p5,即设置了5个分区,为了消费速度最大化,代码中设置了五个线程
开发完生产者的代码如下:
//pom.xml引入kafka配置包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--此处的版本号依赖父版本,省略-->
</dependency>
//java代码
@RestController
public class TestController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("/test")
public void test(){
kafkaTemplate.send("topicName","msg");
消费者代码如下(配置类是百度的,因为代码在内网,单敲麻烦):
1)配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
final static String list ="10.28.18.103:6667";
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
return props;
/** 获取工厂 */
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
/** 获取实例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
factory1.setConsumerFactory(consumerFactory());
factory1.setConcurrency(5);
factory1.getContainerProperties().setPollTimeout(3000);
System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
return factory1;
消费者java代码
@Controller
public class ConsumerController {
@KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
//获取数据逻辑处理
2.问题描述
用postman单个线程启用发送批量数据的情况下数据正常,但是使用jmeter启用五个线程发送批量数据的情况下会出现消费数据丢失的问题,经过验证排除了生产者丢数据的可能,而zookeeper是很久以前部署的公司测试环境服务器,理论上不会出问题,经过查证,kafka的消费者本身是线程不安全的,需要对消费者做下处理,个人在网上copy多线程代码时出现了一些问题,记录下:
参考的博主地址(2条消息) springboot集成kafka多线程定时消费_weixin_40510917的博客-CSDN博客_kafka定时消费
第二版的消费者处理类:
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
//kafka消费对象
private KafkaConsumer<Object, Object> consumer;
//线程池对象
private ExecutorService executors;
//kafka属性配置()
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.1.240:9092");
props.put("group.id", "test01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//下面两个参数是我新加的,不加的情况下会报错,报错见第三小节
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
return props;
//初始化kafka连接
@PostConstruct
public void initKafkaConfig() {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
* 多线程消费kafka数据
* @param workerNum
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
if(consumer!=null){
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord record : consumerRecords) {
executors.submit(new Worker(record));
commitOffset();
}else{
Properties props =initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
private void commitOffset(){
consumer.commitAsync();
}catch(Exception E){
consumer.commitSync();
package com.netintech.kafka.impl;
import com.alibaba.fastjson.JSONObject;
import com.netintech.kafka.bean.Test;
import com.netintech.kafka.service.TestService;
import com.netintech.kafka.task.SendVehicleInfo;
import com.netintech.kafka.utils.SpringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
* 多线程kafka消费类
public class OneWork implements Runnable {
//日志类
private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
private ConsumerRecord<String, String> consumerRecord;
public OneWork(ConsumerRecord record) {
this.consumerRecord = record;
@Override
public void run() {
//执行自己的逻辑
//todo
}catch (Exception e){
LOG.info("异常错误信息:"+e.getMessage());
实际调用使用的是定时器
@Controller
@EnableScheduling
public class ConController{
@Autowired
private ConsumerHandler consumers;
@Scheduled(corn="${work.start:0/1 * * * * ?}")
public void listen(){
consumer.execute(5);//这个5参数在实际中我并没有使用,而是在代码中写死,如果需要可以配置。
3.报错(没加两个参数之前)
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
a retriable exception.You should retry commiting the lastest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException:null
加上参数之后,从日志来看没有报错,数据也没有丢失,但是真正入库是否可行还有待验证,此外,报错的原因以及为什么加上两个参数就解决问题,我其实并不理解,有待考证