springboot2教程系列
写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况 经测试,如果该topic只有一个分区,实际上再启动一个新的消费者,没有作用 。
ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)
操作Topic
配置
@Component
public class PrividerKafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServer;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka实例的连接地址
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig());
}
Controller层
@RestController
@Slf4j
public class TopicController {
@Autowired
private AdminClient adminClient;
@ApiOperation(value = "创建topic")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic名称",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query"),
@ApiImplicitParam(name = "partitions", value = "分区数", defaultValue = "4",
required = true, dataType = "int", paramType = "query"),
@ApiImplicitParam(name = "replicationFactor", value = "副本数", defaultValue = "1",
required = true, dataType = "int", paramType = "query")
@GetMapping("/createTopic")
public String createTopic(String topicName,int partitions,int replicationFactor){
adminClient.createTopics(Arrays.asList(new NewTopic(topicName,partitions,(short)replicationFactor)));
return "create success";
@ApiOperation(value = "查看所有的topic")
@GetMapping("/findAllTopic")
public String findAllTopic() throws ExecutionException, InterruptedException {
ListTopicsResult result = adminClient.listTopics();
Collection<TopicListing> list = result.listings().get();
List<String> resultList = new ArrayList<>();
for(TopicListing topicListing : list){
resultList.add(topicListing.name());
return JSON.toJSONString(resultList);
@ApiOperation(value = "查看topic详情")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic名称",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query")
@GetMapping("/info")
public String topicInfo(String topicName) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
Map<String,String> resultMap = new HashMap<>();
result.all().get().forEach((k,v)->{
log.info("k: "+k+" ,v: "+v.toString());
resultMap.put(k,v.toString());
return JSON.toJSONString(resultMap);
@ApiOperation(value = "删除topic")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic名称",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query")
@GetMapping("/delete")
public String deleteTopic(String topicName){
DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList(topicName));
return JSON.toJSONString(result.values());
}
AdminClient常用方法还有
-
创建Topic:createTopics(Collection newTopics)
-
删除Topic:deleteTopics(Collection topics)
-
罗列所有Topic:listTopics()
-
增加分区:createPartitions(Map<String, NewPartitions> newPartitions)
-
查询Topic:describeTopics(Collection topicNames)
-
查询集群信息:describeCluster()
-
查询ACL信息:describeAcls(AclBindingFilter filter)
-
创建ACL信息:createAcls(Collection acls)
-
删除ACL信息:deleteAcls(Collection filters)
-
查询配置信息:describeConfigs(Collection resources)
-
修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
-
修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
-
查询节点的日志目录信息:describeLogDirs(Collection brokers)
-
查询副本的日志目录信息:describeReplicaLogDirs(Collection replicas)
发送消息
KafkaTemplate发送消息是采取异步方式发送的
发送消息三种方式
//发送带有时间戳的消息
template.send(topic, 0, System.currentTimeMillis(), "0", msg);
//使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord(topic, msg);
template.send(record);
//使用Message发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, topic);
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, "0");
GenericMessage message = new GenericMessage(msg,new MessageHeaders(map));
template.send(message);
消息结果回调
@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
发送同步消息
@GetMapping("/syncMsg")
public String syncMsg(@RequestParam String topic, @RequestParam String msg){
try {
template.send(topic, msg).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
return "success";
}
消费消息
Spring-Kafka中消息监听大致分为两种类型,一种是单条数据消费,一种是批量消费;
GenericMessageListener
@Bean
public KafkaMessageListenerContainer demoListenerContainer(ConsumerFactory consumerFactory) {
ContainerProperties properties = new ContainerProperties("topic3");
properties.setGroupId("group1");
//批量消费
properties.setMessageListener(new MessageListener<Integer,String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
log.info("topic3: " + record.toString());
});*/
//批量消费
properties.setMessageListener(
new BatchAcknowledgingConsumerAwareMessageListener<String,String>(){
@Override
public void onMessage(List<ConsumerRecord<String, String>> list,
Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("size:{}",list.size());
return new KafkaMessageListenerContainer(consumerFactory, properties);
}
其它MessageListener,BatchAcknowledgingConsumerAwareMessageListener为GenericMessageListener的实现类
@KafkaListener
@Component
@Slf4j
public class KafkaConsumer {
//单条消息
@KafkaListener(topics = {"first_top2"})
public void consumer(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record =" + record);
log.info(" message =" + message);
//批量消息
@KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
public void consumerBatch(List<ConsumerRecord<?, ?>> record){
log.info("接收到消息数量:{}",record.size());
}
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
return factory;
}
application.yml
messages:
basename: i18n/Messages,i18n/Pages
kafka:
#bootstrap-servers: 10.10.2.138:9092,10.10.2.138:9093,10.10.2.138:9094
bootstrap-servers: 47.106.106.53:9092
template:
default-topic: self-topic0
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroup998
# 最早未被消费的offset
auto-offset-reset: earliest
# 批量一次最大拉取数据量
max-poll-records: 1000
# 自动提交
enable-auto-commit: true
consumer-extra:
# 是否批量处理
batch-listener: true
@KafkaListener 属性
-
id:消费者的id,当GroupId没有被配置的时候,默认id为GroupId
-
containerFactory:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
-
topics:需要监听的Topic,可监听多个
-
topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听
-
errorHandler:监听异常处理器,配置BeanName
-
groupId:消费组ID
-
idIsGroup:id是否为GroupId
-
clientIdPrefix:消费者Id前缀
-
beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”
监听Topic中指定的分区
@KafkaListener(id = "id0", containerFactory="batchFactory",
topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p0 Received message={}", message);
}
注解方式获取消息头及消息体
@KafkaListener(id = "group3", topics = "first_top3")
public void annoListener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) String ts) {
log.info(" receive : \n"+
"data : "+data+"\n"+
"key : "+key+"\n"+
"partitionId : "+partition+"\n"+
"topic : "+topic+"\n"+
"timestamp : "+ts+"\n"
}
使用Ack机制确认消费
RabbitMQ的消费可以说是一次性的,也就是你确认消费后就
立刻从硬盘或内存中删除
,而且RabbitMQ粗糙点来说是顺序消费,像排队一样,一个个顺序消费,未被确认的消息则会重新回到队列中,等待监听器再次消费。
但Kafka不同,Kafka是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取
把application.yml中的
enable-auto-commit
设置为
false
,设置为不自动提交
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory);
return factory;
}
@KafkaListener(id = "ack", topics = "ack",containerFactory = "ackContainerFactory")
public void ackListener(ConsumerRecord record, Acknowledgment ack) {
log.info("receive : " + record.value());
//手动提交
// ack.acknowledge();
}
实现消息转发
@KafkaListener(id = "forward", topics = "first_top4")
@SendTo("first_top2")
public String forward(String data) {
log.info("接收到消息数量:{}",data);
return "send msg : " + data;
}
启动关闭监听
@RestController
public class ConsumerContoller {
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@GetMapping("/stop")
public String stop(){
registry.getListenerContainer("forward").pause();
return "success";
@GetMapping("/start")
public String start(){
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer("forward").isRunning()) {
registry.getListenerContainer("forward").start();
registry.getListenerContainer("forward").resume();
return "success";
}
启动类要添加
@EnableKafka
配置消息过滤器
消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。
/**
* 消息过滤
* @return
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//配合RecordFilterStrategy使用,被过滤的信息将被丢弃
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
String msg = (String) consumerRecord.value();
if(msg.contains("abc")){
return false;
log.info("filterContainerFactory filter : "+msg);
//返回true将会被丢弃
return true;
return factory;
}
public class FilterListener {
@KafkaListener(topics = {"filter_topic"},containerFactory="filterContainerFactory")
public void consumerBatch(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record =" + record);
log.info("接收到消息数量:{}",message);