引入依赖 (可以不指定版本)
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
如果启动报错
Caused by: java.lang.NoClassDefFoundError: org/springframework/core/log/LogAccessor
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.<init>(KafkaListenerAnnotationBeanPostProcessor.java:148)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:172)
... 19 common frames omitted
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception during bean creation; nested exception is java.lang.TypeNotPresentException: Type org.springframework.kafka.listener.CommonErrorHandler not present
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.15.jar:5.3.15]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953) ~[spring-beans-5.3.15.jar:5.3.15]
就把指定版本去掉
配置文件yml
修改kafka连接地址 其他按需修改
#kafka的topic名称
kafkaTopic: topic-test
spring:
kafka:
bootstrap-servers: 192.168.1.12:9092 #kafka连接地址
producer:
acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 #批量大小
properties:
linger.ms: 0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
buffer-memory: 33554432 #生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: defaultConsumerGroup # 默认的消费组ID
enable-auto-commit: true # 是否自动提交offset
## 当kafka中没有初始offset或offset超出范围时将自动重置offset
## earliest:重置为分区中最小的offset;
## latest:重置为分区中最新的offset(消费分区中新产生的数据);
## none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-commit-interval:
ms: 1000
auto-offset-reset: latest
properties:
session.timeout.ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
request.timeout.ms: 180000 # 消费请求超时时间
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,自动创建,true时表示如果不存在启动报错
flyway:
connect-retries: 0 #重试次数
如果设置了账号密码,那就需要配置账号密码
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码"; #分号不能去
不加分号会报错
java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon
at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:121) ~[kafka-clients-3.0.0.jar:na]
消费者也有的话 ,消费者也配置
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
* @author yvioo
@Component
public class KafkaConsumer {
* 消费监听
* @param record
@KafkaListener(topics = "${kafkaTopic}")
public void onMessage(ConsumerRecord<?, ?> record){
System.out.println("收到消息:topic名称:"+record.topic()+",分区:"+record.partition()+",值:"+record.value());
KafkaProducer.java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
* @author 。
@RestController
public class KafkaProducer {
@Value("${kafkaTopic}")
private String kafkaTopic;
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
* 发送消息
* @param message
@GetMapping("/send")
public void sendMessage1(String message) {
kafkaTemplate.send(kafkaTopic, message);
* 有发送结果回调
* @param message
@GetMapping("/send/callback")
public void sendMessage3(String message) {
kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("fail:"+ex.getMessage());
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("success:topic名称:" + result.getRecordMetadata().topic() + ",分区:"
+ result.getRecordMetadata().partition() + ",消息在分区中的标识:" + result.getRecordMetadata().offset());
自定义发送的分区器
MyPartitioner.java
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
* 自定义分区器
* @author 。
public class MyPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//设置分区逻辑
return 0;
@Override
public void close() {
@Override
public void configure(Map<String, ?> map) {
配置文件增加 后面跟类全路径
partitioner.class: com.example.kafka.config.MyPartitioner #自定义分区器
很常见的场景就是我们希望下单、支付消息有顺序,这样以订单ID作为key发送消息就达到了分区有序性的目的。
KafkaProducerInterceptor.java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
* 生产者拦截器
* @author 。
public class KafkaProducerInterceptor implements ProducerInterceptor<String,String> {
@Override
public void configure(Map<String, ?> map) {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//这里可以改变发送的数据,比如加个时间戳
return new ProducerRecord<>(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"_"+record.value());
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
//发送成功
System.out.println("发送成功");
}else {
System.out.println("发送失败");
@Override
public void close() {
Kerberos证书连接配置文件方式
properties:
java.security.auth.login.config: /kerberos/kafka_kafka_jaas.conf
java.security.krb5.conf: /kerberos/krb5.conf
sasl.mechanism: GSSAPI
security.protocol: SASL_PLAINTEXT
sasl.kerberos.service.name: kafka
sasl.jaas.config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/kerberos/kafka.keytab" principal="kafka/admin@ETAIN.COM"; #分号不能少
public static void connectKafka() {
//准备JAAS配置文件路径
String kafkaClientJaasFile = "/kerberos/kafka_kafka_jaas.conf";
// Kerberos配置文件路径
String krb5FilePath = "/kerberos/krb5.conf";
// .keytab证书文件路径
String keyTabPath = "/kerberos/dsmm_kafka.keytab";
// Kerberos Principal
String principal = "kafka/admin@DFS.COM";
System.setProperty("java.security.auth.login.config", kafkaClientJaasFile);
System.setProperty("java.security.krb5.conf", krb5FilePath);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "102.123.23.11:9092");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//使用kerberos安全认证连接kafka
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "GSSAPI");
props.setProperty("sasl.kerberos.service.name", "kafka");
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required\n" +
" useKeyTab=true\n" +
" storeKey=true\n" +
" keyTab=\"" + keyTabPath + "\"\n" +
" principal=\"" + principal + "\";");
* 从Kafka topic中消费消息
props.setProperty("topic", "test");
//设置消费的位置,earliest表示从头开始消费,latest表示从最新的位置开始消费
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("test");
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
kafkaConsumer.assign(topicPartitions);
enable.auto.commit
指定了消费者是否自动提交偏移量,默认值是true,为了尽量避免重复数据和数据丢失,可以把它设置为false,有自己控制合适提交偏移量,如果设置为true, 可以通过设置 auto.commit.interval.ms属性来控制提交的频率。
详细地来说:
当一个consumer因某种原因退出Group时,进行重新分配partition后,同一group中的另一个consumer在读取该partition时,怎么能够知道上一个consumer该从哪个offset的message读取呢?也是是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会拉取到一定量的数据,但是这些数据还没有被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而导致Consumer消费的数据丢失吗?
为了做到这一点,当使用完poll从本地缓存拉取到数据之后,需要client调用commitSync方法(或者commitAsync方法)去commit 下一次该去读取 哪一个offset的message。
而这个commit方法会通过走网络的commit请求将offset在coordinator中保留,这样就能够保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。
对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。
session.timeout.ms
该属性指定了当消费者被认为已经挂掉之前可以与服务器断开连接的时间。默认是3s,消费者在3s之内没有再次向服务器发送心跳,那么将会被认为已经死亡。此时,协调器将会出发再均衡,把它的分区分配给其他的消费者,该属性与heartbeat.interval.ms紧密相关,该参数定义了消费者发送心跳的时间间隔,也就是心跳频率,一般要同时修改这两个参数,heartbeat.interval.ms参数值必须要小于session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms设置成3min,那么heartbeat.interval.ms一般设置成1min,这样,可以更快的检测以及恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡(有一种情况就是网络延迟,本身消费者是没有挂掉的,但是网络延迟造成了心跳超时,这样本不该发生再均衡,但是因为网络原因造成了非预期的再均衡),把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是lMB , 也
就是说,kafkaConsumer.poll() 方法从每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生奔溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比broker 能够接收的最大消息的字节数(通过max.message.size 属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试,例如,max.message.size设置为2MB,而该属性设置为1MB,那么当一个生产者可能就会生产一条大小为2MB的消息,那么就会出现问题,消费者能从分区取回的最大消息大小就只有1MB,但是数据量是2MB,所以就会导致消费者一直挂起重试。
在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll()方法
来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无怯及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把max.partitioin.fetch.bytes 值改小,或者延长会话过期时间。
fetch.min.bytes
消费者从服务器获取记录的最小字节数,broker收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么broker将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和broker的工作负载。
因为当主题不是很活跃的情况下,就不需要来来回回的处理消息,如果没有很多可用数据,但消费者的CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。
fetch.max.wait.ms
fetch.min.bytes设置了broker返回给消费者最小的数据量,而fetch.max.wait.ms设置的则是broker的等待时间,两个属性只要满足了任何一条,broker都会将数据返回给消费者,也就是说举个例子,fetch.min.bytes设置成1MB,fetch.max.wait.ms设置成1000ms,那么如果在1000ms时间内,如果数据量达到了1MB,broker将会把数据返回给消费者;如果已经过了1000ms,但是数据量还没有达到1MB,那么broker仍然会把当前积累的所有数据返回给消费者。
max.poll.records
控制单次调用call方法能够返回的记录数量,帮助控制在轮询里需要处理的数据量。
receive.buffer.bytes + send.buffer.bytes
socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
partition.assignment.strategy
分区分配策略,kafka有两个默认策略:
Range:该策略会把主题的若干个连续的分区分配给消费者
Robin:该策略把主题的所有分区逐个分配给消费者