public void testKafkaTemplate () {
ListenableFuture
> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}");
CompletableFuture> completable = future.completable();
completable.whenCompleteAsync((n, e) -> {
if (null != e) {
System.out.println("发送报错了");
} else {
System.out.println("发送成功了!");
public void testKafkaTemplate1() {
ListenableFuture> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}");
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
@Override
public void onSuccess(SendResult stringStringSendResult) {
前言:今天看到有人说
kafka
生产者
在发送消息后,如果发生异常,异常捕获方法里拿不到消息的数据,我想了想,感觉不太对劲,所以验证了一下。
首先说下结论:
kafka
是不会在
生产者
发送消息的
回调
中,把发送的消息再一次返回回来的,因为这些消息我们可以自己记录,没必要浪费网络资源。
kafka
-client的
回调
方法
kafka
原生的
kafka
-client包中,
生产者
发送
回调
方法如下,其中Rec...
public class
Kafka
Send
ResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(
Kafka
Send
ResultHandler.class);
*
kafka
发送成功
回调
* @param
package shangbo.
kafka
.example7;
import org.apache.
kafka
.clients.producer.RecordMetadata;
import org.springframework.context.ApplicationContext;
import org.springframework.cont...
其中topic,message是范指。
异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。
注:batch.size默认是16384,linger.ms默认是0
同步发送:
代码使用:
kafka
Template
.
send
(topic,message
private
Kafka
Template
<String, String>
kafka
Template
;
// 发送消息到指定主题
kafka
Template
.
send
("test_topic", "key", "value");
// 消费者
@
Kafka
Listener(topics = "test_topic")
public void receiveMessage(ConsumerRecord<?, ?> record) {
System.out.println("收到消息:" + record.value().toString());