【README】

  • 本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3个节点分别是 centos201, centos202, centos203 ; brokerid 分别为 1,2,3;
  • 本文主要用于测试 再均衡监听器;当有新消费者加入时,会发生分区再均衡;再均衡前就会调用再均衡监听器的 onPartitionsRevoked() 方法;
  • 本文的测试主题 hello12,有3个分区,每个分区2个副本;

【1】再均衡监听器

1)应用场景

在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;

2)再均衡监听器 ConsumerRebalanceListener

实现 该监听器即可,有3个方法

  1. onPartitionsRevoked: 在分区均衡开始【前】和消费者停止读取消息【后】被调用;
  2. onPartitionsAssigned: 分区再均衡【后】和消费者开始读取消息【前】被调用 ;
  3. onPartitionsLost: 分区宕机时调用( 本文不涉及 );
* @Description 消费者分区再均衡监听器实现类 * @author xiao tang * @version 1.0.0 * @createTime 2021年12月11日 public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener { /** 消费者 */ private Consumer consumer; /** 主题分区偏移量数组 */ private MyTopicPartitionOffset[] topicPartitionOffsetArr; * @description 构造器 * @param consumer 消费者 * @param curOffsets 当前偏移量 * @author xiao tang * @date 2021/12/11 public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) { this.consumer = consumer; this.topicPartitionOffsetArr = topicPartitionOffsetArr; * @description 在分区均衡开始【前】和消费者停止读取消息【后】被调用 * @param partitions 分区列表(分区号从0开始计数) * @author xiao tang * @date 2021/12/12 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("=== 分区再均衡触发onPartitionsRevoked()方法"); // 提交偏移量回调(或记录错误日志) OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallbackImpl(); // 打印日志 Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x-> System.out.printf("提交偏移量信息,partition【%d】offset【%s】\n", x.partition(), x.offset()) // 把数组转为主题分区与偏移量映射,并提交最后一次处理的偏移量 (可以异步,也可以同步) // 同步提交一直重试或报超时异常 // 异步提交传入提交回调,失败自行处理 consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback); // 停止程序的原因在于做实验,下次从本次提交的偏移量开始消费 throw new RuntimeException("发生分区再均衡,程序结束"); * @description 分区再均衡【后】和消费者开始读取消息【前】被调用 * @param partitions 主题分区列表 * @author xiao tang * @date 2021/12/12 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // do sth @Override public void onPartitionsLost(Collection<TopicPartition> partitions) { ConsumerRebalanceListener.super.onPartitionsLost(partitions);

为了测试,分区再均衡监听器中,onPartitionsRevoked() 方法提交最后已消费消息的偏移量后, 就抛出运行时异常结束运行 ,让其他消费者消费以便查看监听器是否成功提交偏移量;

3)消费者工具

* @Description 消费者工具 * @author xiao tang * @version 1.0.0 * @createTime 2021年12月12日 public enum MyConsumerUtils { /** 单例 */ INSTANCE; * @description 获取主题分区与偏移量映射 * @param topicPartitionOffsetArr 主题分区与偏移量数组 * @return 主题分区与偏移量映射 * @author xiao tang * @date 2021/12/12 public static Map<TopicPartition, OffsetAndMetadata> getTopicPartitionOffsetMetadataMap( MyTopicPartitionOffset[] topicPartitionOffsetArr) { // 主题分区与偏移量映射 Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMetadataMap = new HashMap<>(topicPartitionOffsetArr.length); // 分区号大于-1,才是消费者接收的分区 Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x -> { topicPartitionOffsetMetadataMap.put( new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), "no metadata")); return topicPartitionOffsetMetadataMap;

【2】生产者

* @Description 生产者 * @author xiao tang * @version 1.0.0 * @createTime 2021年12月03日 public class MyProducer { /** 主题 */ public final static String TOPIC_NAME = "hello12"; public static void main(String[] args) { /* 1.创建kafka生产者的配置信息 */ Properties props = new Properties(); /*2.指定连接的kafka集群, broker-list */ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); /*3.ack应答级别*/ props.put(ProducerConfig.ACKS_CONFIG, "all"); /*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 0); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 超时时间 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /** 设置压缩算法 */ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); /** 设置拦截器 */ // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName())); /** 设置阻塞超时时间 */ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000); /* 9.创建生产者对象 */ KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.发送数据 */ int order = 1; for (int i = 0; i < 10000; i++) { for (int j = 0; j < 3; j++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5))); try { System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】"); } catch (Exception e) { /* 11.关闭资源 */ producer.close(); System.out.println("kafka生产者写入数据完成");

【3】消费者

【3.1】带有均衡监听器的消费者1

* @Description 带有均衡监听器的消费者 * @author xiao tang * @version 1.0.0 * @createTime 2021年12月11日 public class MyConsumerWithRebalanceListener { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 设置消费消息的位置,消费最新消息 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); // 创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); int partitionSize = consumer.partitionsFor(MyProducer.TOPIC_NAME).size(); // 创建分区偏移量数组并初始化 (仅考虑一个topic的情况) MyTopicPartitionOffset[] topicPartitionOffsetArr = new MyTopicPartitionOffset[partitionSize]; IntStream.range(0, partitionSize).forEach(x -> topicPartitionOffsetArr[x] = new MyTopicPartitionOffset()); // 订阅主题, 【传入分区再均衡监听器】 consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr)); // 循环拉取 try { while (!Thread.interrupted()) { System.out.println(DateUtils.getNowTimestamp() + " > 带均衡监听器消费者等待消费消息"); TimeUtils.sleep(1000); // 消费消息 ConsumerRecords<String, String> consumerRds = consumer.poll(100); System.out.println("poll begin {"); for (ConsumerRecord<String, String> rd : consumerRds) { System.out.println("消费者-WithRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); // 提交的偏移量,是 当前消息偏移量加1 topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() + 1); System.out.println("poll end } "); // 【异步提交每个分区的偏移量】 consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl()); } finally { try { // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr)); } finally { // 记得关闭消费者 consumer.close();

【3.2】 不带均衡监听器的消费者2 (测试用)

即一个普通消费者;

* @Description 不带均衡监听器的消费者 * @author xiao tang * @version 1.0.0 * @createTime 2021年12月11日 public class MyConsumerNoRebalanceListener { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 设置消费消息的位置,消费最新消息 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); // 创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题, 【没有分区再均衡监听器】 consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); // 循环拉取 try { while(!Thread.interrupted()) { System.out.println(DateUtils.getNowTimestamp() + " > 没有均衡监听器的消费者等待消费消息"); TimeUtils.sleep(1000); // 消费消息 ConsumerRecords<String, String> consumerRds = consumer.poll(100); for(ConsumerRecord<String, String> rd : consumerRds) { System.out.println("消费者-NoRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); // 【异步提交】 consumer.commitAsync(new OffsetCommitCallbackImpl()); if (!consumerRds.isEmpty()) break; } finally { try { // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 consumer.commitSync(); } finally { // 记得关闭消费者 consumer.close(); System.out.println("消费者关闭");

我们可以发现,一旦消费者2消费了消息(消息不为空),就停止消费;

以便我们查看 消费者2接收消息的偏移量是不是 消费者1的监听器在发生分区再均衡前提交的偏移量+1;

【4】测试

【4.1】测试步骤

step1) 运行生产者,发送消息到kafka;

step2) 运行 带有均衡监听器的 消费者1 MyConsumerWithRebalanceListener, 消费消息;

在消费者订阅主题时,传入再均衡监听器;

// 订阅主题, 【传入分区再均衡监听器】
consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)
, new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));

step3)运行 不带均衡监听器的 消费者2 MyConsumerNoRebalanceListener,消费消息;

一旦消费者2加入消费者组,就会发生分区再均衡, 消费者1的某些分区所有权会转给消费者2 ,触发消费者1 的 监听器 ConsumerBalanceListenerImpl 的 onPartitionsRevoked() 方法;

  • 然后 onPartitionsRevoked()方法提交 消费者1处理的消息的偏移量后,就原地抛出异常停止运行;

【4.2】测试结果分析

1)消费者1(带分区再均衡监听器)的监听器最后提交的偏移量日志如下:

2021-12-12 10:23:30 > 带均衡监听器消费者等待消费消息
=== 分区再均衡触发onPartitionsRevoked()方法
提交偏移量信息,partition【0】offset【1296】
提交偏移量信息,partition【1】offset【1269】
提交偏移量信息,partition【2】offset【1269】

2)消费者2接收到的起始消息的偏移量日志如下(全部):

2021-12-12 10:23:27 > 没有均衡监听器的消费者等待消费消息
2021-12-12 10:23:32 > 没有均衡监听器的消费者等待消费消息
消费者-NoRebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1297】值=[592]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1298】值=[595]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1299】值=[598]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1300】值=[601]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1301】值=[604]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1302】值=[607]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1303】值=[610]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1304】值=[613]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1270】值=[513]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1271】值=[516]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1272】值=[519]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1273】值=[522]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1274】值=[525]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1275】值=[528]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1276】值=[531]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1277】值=[534]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1270】值=[512]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1271】值=[515]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1272】值=[518]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1273】值=[521]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1274】值=[524]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1275】值=[527]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1276】值=[530]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1277】值=[533]  > ABCDE
消费者关闭

即 监听器提交的偏移量为:

partition【0】offset【1296】
partition【1】offset【1269】
partition【2】offset【1269】

而普通消费者接收消息的起始偏移量为

消费者-NoRebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE

所以,偏移量是可以对上的;即再均衡监听器在发生分区再均衡前提交的消息偏移量后, 其他消费者可以接收该偏移量指定的消息;

所以,本次再均衡监听器测试是成功的

注意1)监听器提交的偏移量是接收消息的当前偏移量+1; (注意要加1,非常重要) ,即加1后的偏移量作为其他消费者轮序消费的起始位置;

  • 代码:偏移量+1参见  MyConsumerWithRebalanceListener 的 接收消息的循环中的代码,如下:
 // 提交的偏移量,是 当前消息偏移量加1
topicPartitionOffsetArr[rd.partition()].setAll(
rd.topic(), rd.partition(), rd.offset() + 1); 

注意2)本文代码参考了 《kafka权威指南》 page63, 但书中代码有问题

  • 在每次for循环中创建 TopicPartition对象和 OffsetAndMetadata对象,我觉得这是没有必要的,因为只有每个分区的最后一次消息的 topic,partition,offset,是有用的,但它每次循环都创建了对象,而且把 currentOffsets 放在了while循环外面,这样肯定会造成 oom;( 本文仅记录了 topic,partition,offset,而没有创建对象,这是本文的优化点
  • 当然了,原作者写的是参考代码,可以理解;但业务生产代码绝对不能这样写;

【References】

  • kakfa权威指南;
【README】本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3个节点分别是 centos201, centos202, centos203 ; brokerid 分别为 1,2,3; 本文主要用于测试 再均衡监听器;当有新消费者加入时,会发生分区再均衡;再均衡前就会调用再均衡监听器的 onPartitionsRevoked()方法; 本文的测试主题 hello12,有3个分区,每个分区2个副本;【1】再均衡监听器1)应用场景在消费者退出或进行分区再均衡前.
kafka 消费者客户端再平衡表示的是分区的所属权从一个消费者转移到了另一个消费者的行为,它为消费者组具备高可用性和伸缩性提供保障,使我们能够方便的删除消费者组中的消费者或者往消费者组中添加消费者。但是在再平衡期间,消费者组内的消费者是无法读取消息的,在再 均衡 这一段时间内,消费者组不可用。 再 均衡 可能会导致向消费者位移还未来得及提交就发生了再 均衡 操作,会导致消息重复。 在消费者订阅主题的时候除了su...
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为重平衡(Rebalance)。 Rebalance 实现了消费者群组的高可用性和伸缩性。 消费者通过向被指派为群组协调器(Coordinator)的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。 所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体
1. 前言 好久没有写博客了,正好最近在工作的时候,使用 spring- kafka 消费消息时候遇到一个关于批量消息处理的问题,通过阅读 spring- kafka 源码,才理解产生问题的原因,以及解决方法。 2. 背景 最近在开发一个需求中,有一个场景是需要接受第三方公司数据回调,我们系统需要提供一个接口接受和处理数据。这个接口处理的业务逻辑比较复杂,耗时比较长,为了不让调用方等待接口调用过长时间,考虑采用将数据发送到 kafka ,然后异步监听消费这个数据。这样接口只负责将数据发送到 kafka ,然后就
某台 kafka 服务器负载过高,机器挂掉一段是时间后,kill掉占用内存的进程,然后重启 kafka 服务,但是一直不能完成启动和数据同步,日志如下fset 0 to broker BrokerEndPoint(11,192.168.207.79,9092)] ) ( kafka .server.ReplicaFetcherManager) [2016-04-26 19:16:33,274] INFO [R
转载地址:https://www.cnblogs.com/sodawoods-blogs/p/8969774.html                   https://blog.csdn.net/qq_35349490/article/details/79790625                   https://blog.csdn.net/qq_35349490/article/d...
1再 均衡 均衡 是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再 均衡 发生期间,消费组内的消费者是无法读取消息的。也就是说,在再 均衡 发生期间的这一小段时间内,消费组会变得不可用。 另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再 均衡 操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完
kafka 服务器原因 kafka 消费者原因 1:当producer发送信息时,如何确定该条信息是否被服务器确认, kafka 中有3中策略可供选择(request.required.acks属性确定:就是ack机制)。 第一种(request.required.acks=0),即生...
1. Java Client Consumer 实现 "--from-beginning": Kafka Consumer Java Doc 的"Controlling The Consumer's Position"章节中提到,使用" seekToBeginning(Collection) " 可以实现"--from-beginning"的功能,但是却没有告诉我们,这个方法如何调用.如果在"su
某个实时高流量的数据流的数据统计模块,需要实时读取 kafka 数据并进行数种数据统计分析。 问题描述:   负责关注数据流部分后,发现监控系统不断向我手机发送短信报警,报警内容指明是 kafka 的某个ConsumerGroup:topic的某些分区消息延迟Lag过高!   马上监控系统web端可以看到: kafka 对应ConsumerGroup:topic的消息延迟Lag非常不均匀,部分分区的Lag处于正常水平,而部分分区的Lag却不断堆积,甚至Lag值超过上亿。 原因分析: 1、确定排查方向
之前已经介绍了通过和工具来消费数据。下面介绍Spring Kafka 消费数据的方式—— kafka 消息 监听器 Kafka 的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。是的消息 监听器 接口,也是一个函数式接口,利用接口的方法可以实现消费数据。 基于此接口可以实现单条数据消息 监听器 接口、多条数据消息 监听器 接口、带ACK机制的消息 监听器 和MessageListenerGenericMessageListener单条数据 监听器 BatchMessageListener批量数据 监听器 AckowledgingMe
注意:在配置文件server.properties中指定了partition的数量num.partitions。这指的是多单个topic的partition数量之和。若有多个broker,可能partition分布在不同的节点上,则多个broker的所有partitioin数量加起来为num.partitions 0.7中producer的配置有几项是相排斥的,设置了其一,就不能设置其二 比如...
1, java .lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled kafka 进程失败,重启失败 检查日志发现这个,原因是磁盘满了,清除 kafka -logs 即可2, kafka 定时清理日志不生效网上很多文档,说是要设置log.retention.hour等等参数。 默认是保留7天,但我实测下来发现日志根本没有任何变化。原因是因为 kafka 只会回收上个分片的数据配置没有生效的原因
IEEE论文检测的字体未嵌入问题Times New Roman,Bold, Times New Roman,Italic is not embedded解决方法 知识它难道硌你脑子吗: 你就是我的神!!!超级超级超级感谢! LNCS用户写作指南【 Springer Computer Science Proceedings 】 qq_40826371: 大哥,参考文献可以使用bib文件吗?还是只能手敲? IEEE论文检测的字体未嵌入问题Times New Roman,Bold, Times New Roman,Italic is not embedded解决方法 mainzld: 遇到了相似度的问题,建议如果时visio绘图导出pdf的话,采用adobe 转换为eps文件,然后在latex中采用eps文件显示图片,这样最终的手稿pdf就没有问题了 8.4-中断系统小结(cpu中断七个问题) m0_74432640: IEEE论文检测的字体未嵌入问题Times New Roman,Bold, Times New Roman,Italic is not embedded解决方法 zhaleya: 重新打印之后,页边距变大