package com.example.demo.controller;
import com.example.demo.common.KafkaInfo;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutionException;
@RestController
@RequestMapping("producer")
public class ProducerController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@PostMapping("fireAndForget")
public String fireAndForget() {
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now());
return "success";
}
@PostMapping("sync")
public String sync() throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "sync:" + LocalDateTime.now());
System.out.println("生产者(sync方式)返回值:" + future.get());
System.out.println("生产者(sync方式)future.get之后");
return "success";
}
@PostMapping("async")
public String async() throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "async:" + LocalDateTime.now());
future.addCallback(o-> System.out.println("生产者(async方式)回调:发送成功"), error-> System.out.println("生产者(async方式)回调:发送失败"));
System.out.println("生产者(async方式)发送完毕");
System.out.println("生产者(sync方式)返回值:" + future.get());
return "success";
}
}