相关文章推荐
老实的人字拖  ·  Postgres: ...·  8 月前    · 
坏坏的青蛙  ·  C#.Net ...·  1 年前    · 

简介

说明

本文用示例介绍如何使用SringBoot整合Kafka。

版本

SpringBoot与kafka版本对应关系:​ ​Spring for Apache Kafka​

Kafka--SpringBoot--整合/使用/教程/实例_spring

实例

依赖及配置

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

application.yml

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: consumer_group_id #这个必须配置。如果两个微服务用同一id,则消息共享:被一个客户端读取,其他客户端就无法再读取它
enable-auto-commit: true
auto-commit-interval: 1000
#earliest: #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest (默认(生产使用)): #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费该分区下新产生的数据
#none: # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest

#client-id: consumer1
server:
port: 8081

生产者

package com.example.demo.controller;

import com.example.demo.common.KafkaInfo;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.util.concurrent.ExecutionException;

@RestController
@RequestMapping("producer")
public class ProducerController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@PostMapping("fireAndForget")
public String fireAndForget() {
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now());
return "success";
}

@PostMapping("sync")
public String sync() throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "sync:" + LocalDateTime.now());
System.out.println("生产者(sync方式)返回值:" + future.get());
System.out.println("生产者(sync方式)future.get之后");
return "success";
}

@PostMapping("async")
public String async() throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "async:" + LocalDateTime.now());
future.addCallback(o-> System.out.println("生产者(async方式)回调:发送成功"), error-> System.out.println("生产者(async方式)回调:发送失败"));
System.out.println("生产者(async方式)发送完毕");
System.out.println("生产者(sync方式)返回值:" + future.get());

return "success";
}
}

消费者

package com.example.demo.consumer;

import com.example.demo.common.KafkaInfo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
@KafkaListener(topics = KafkaInfo.TOPIC_WELCOME)
public String consumer2(@Payload String message, @Headers MessageHeaders headers) {
System.out.println("消费者(注解方式):收到消息==> ");
System.out.println(" message:" + message);
System.out.println(" headers:");
headers.keySet().forEach(key -> System.out.println(" " + key + ":" + headers.get(key)));
return "success";
}

// //下边这样写也可以
// @KafkaListener(topics = KafkaInfo.TOPIC_WELCOME)
// public String consumer1(ConsumerRecord<String, String> consumerRecord) {
// System.out.println("消费者(ConsumerRecord方式):收到消息==> " + "value类型:" + consumerRecord.value().getClass().getName()
// + "; 数据:" + consumerRecord.value());
// return "success";
// }
}

其他代码

package com.example.demo.common;

public interface KafkaInfo {
// static String TOPIC_WELCOME = "topic@welcome"; //不能有@符号,不然报错
static String TOPIC_WELCOME = "topic_welcome";
}
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}

测试

首先,要启动zookeeper、kafka服务。

发送并忘记

把消息发送给服务器,但并不关心它是否正常到达。

访问:​ ​http://localhost:8081/producer/fireAndForget​

后端结果:

消费者(注解方式):收到消息==> 
message:fireAndForget:2021-02-14T20:50:14.626
headers:
kafka_offset:22
kafka_consumer:org.apache.kafka.clients.consumer.KafkaConsumer@1adf894d
kafka_timestampType:CREATE_TIME
kafka_receivedPartitionId:0
kafka_receivedTopic:topic_welcome
kafka_receivedTimestamp:1613307014653
kafka_groupId:consumer_group_id

同步发送

调用 get() 方法进行等待。

访问:​ ​http://localhost:8081/producer/sync​

后端结果:(1.get()方法是同步的 2.生产者收到的返回值与生产者的返回值无关)

生产者(sync方式)返回值:SendResult [producerRecord=ProducerRecord(topic=topic_welcome, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=sync:2021-02-14T20:51:42.636, timestamp=null), recordMetadata=topic_welcome-0@23]
生产者(sync方式)future.get之后
消费者(注解方式):收到消息==>
message:sync:2021-02-14T20:51:42.636
headers:
kafka_offset:23
kafka_consumer:org.apache.kafka.clients.consumer.KafkaConsumer@1adf894d
kafka_timestampType:CREATE_TIME
kafka_receivedPartitionId:0
kafka_receivedTopic:topic_welcome
kafka_receivedTimestamp:1613307102636
kafka_groupId:consumer_group_id

异步发送

指定一个回调函数,服务器在返回响应时调用该函数。

访问:​ ​http://localhost:8081/producer/async​

后端结果:(1.回调函数是异步的 2.生产者收到的返回值与生产者的返回值无关)

生产者(async方式)发送完毕
生产者(async方式)回调:发送成功
生产者(sync方式)返回值:SendResult [producerRecord=ProducerRecord(topic=topic_welcome, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=async:2021-02-14T20:59:31.499, timestamp=null), recordMetadata=topic_welcome-0@26]
消费者(注解方式):收到消息==>
message:async:2021-02-14T20:59:31.499
headers:
kafka_offset:26
kafka_consumer:org.apache.kafka.clients.consumer.KafkaConsumer@7ed9ff13
kafka_timestampType:CREATE_TIME
kafka_receivedPartitionId:0
kafka_receivedTopic:topic_welcome
kafka_receivedTimestamp:1613307571522
kafka_groupId:consumer_group_id
springboot ControllerAdvice不起效

学习在 Spring Boot 如何使用 @ControllerAdvice 注解。它其实是 Spring MVC 提供的功能,是一个增强的 Controller ,主要可以实现三个方面的功能:全局异常处理、全局数据绑定、全局数据预处理。1 全局异常处理@ControllerAdvice public class MyControllerAdvice { @ExceptionHandler