public void testKafkaTemplate () { ListenableFuture> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}"); CompletableFuture> completable = future.completable(); completable.whenCompleteAsync((n, e) -> { if (null != e) { System.out.println("发送报错了"); } else { System.out.println("发送成功了!"); public void testKafkaTemplate1() { ListenableFuture> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}"); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { @Override public void onSuccess(SendResult stringStringSendResult) {
前言:今天看到有人说 kafka 生产者 在发送消息后,如果发生异常,异常捕获方法里拿不到消息的数据,我想了想,感觉不太对劲,所以验证了一下。 首先说下结论: kafka 是不会在 生产者 发送消息的 回调 中,把发送的消息再一次返回回来的,因为这些消息我们可以自己记录,没必要浪费网络资源。 kafka -client的 回调 方法 kafka 原生的 kafka -client包中, 生产者 发送 回调 方法如下,其中Rec...
public class Kafka Send ResultHandler implements ProducerListener { private static final Logger log = LoggerFactory.getLogger( Kafka Send ResultHandler.class); * kafka 发送成功 回调 * @param package shangbo. kafka .example7; import org.apache. kafka .clients.producer.RecordMetadata; import org.springframework.context.ApplicationContext; import org.springframework.cont... 其中topic,message是范指。 异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。 注:batch.size默认是16384,linger.ms默认是0 同步发送: 代码使用: kafka Template . send (topic,message private Kafka Template <String, String> kafka Template ; // 发送消息到指定主题 kafka Template . send ("test_topic", "key", "value"); // 消费者 @ Kafka Listener(topics = "test_topic") public void receiveMessage(ConsumerRecord<?, ?> record) { System.out.println("收到消息:" + record.value().toString());