往kafka发送消息,提供三种构造函数形参:

-- ProducerRecord(topic, partition, key, value)
-- ProducerRecord(topic, key, value)
-- ProducerRecord(topic, value)
<1> 若指定Partition ID,则PR被发送至指定Partition;
<2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition;
<3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition;
<4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)。

1、kafka生产者同步发送数据

public class KafkaProducerTest {
    private static Properties kafkaProps = new Properties();
     * 初始化一些配置信息
    public static void initProperty(){
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("retries", 3);
        kafkaProps.put("acks", "all");
        kafkaProps.put("client.id", "zsd");
     * 往kafka同步发送消息
    public static   void  syncSend() throws ExecutionException, InterruptedException {
        initProperty();
        // 创建kafka的生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic1", "key", "value");
        Future<RecordMetadata> future = kafkaProducer.send(record);
        RecordMetadata recordMetadata = future.get();
        System.out.println("offset:" + recordMetadata.offset()
                + "\npartition:" + recordMetadata.partition()
                + "\ntopic:" + recordMetadata.topic()
                + "\nserializedKeySize:" + recordMetadata.serializedKeySize()
                + "\nserializedValueSize:" + recordMetadata.serializedValueSize()
        kafkaProducer.close();

2、kafka生产者异步发送数据

* kafka生产者往topic中异步发送消息 public static void asyncSend(){ initProperty(); // 创建kafka的生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("topic2", "key", "value"); kafkaProducer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("offset:" + recordMetadata.offset() + "\npartition:" + recordMetadata.partition() + "\ntopic:" + recordMetadata.topic() + "\nserializedKeySize:" + recordMetadata.serializedKeySize() + "\nserializedValueSize:" + recordMetadata.serializedValueSize() if(e == null){ System.out.println("hello"); }); kafkaProducer.close();

3、spring boot中的kafka生产者发送数据(利用KafkaTemplate )

@RestController
@Slf4j
public class LoggerController {
    @Autowired // IOC注入
    KafkaTemplate kafkaTemplate;
    @RequestMapping("/applog")
    public String applog(@RequestBody String  applog) {
        log.info(applog);
        // json解析
        JSONObject jsonObject = JSON.parseObject(applog);
        JSONObject startJsonbject = jsonObject.getJSONObject("start");
        if(startJsonbject != null){
            // 不为空,则为启动日志
            kafkaTemplate.send("gmall_start",applog);
        }else {
            // 事件日志
            kafkaTemplate.send("gmall_event",applog);
        return  "applog success ~~~";
评论,点赞、关注等系统通知是 并发、异步的。可以将其抽象为event时间(Entity类转化为event类,然后让Kafka处理)
定义Event实体
package com.nowcoder.community.entity;
import java.util.HashMap;
import java.util.Map;
public class 
消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图
我们从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。
在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输
生产者在发送消息到Kafka之前,需要显示地使用代码或者在Kafka服务器上创建topic吗?
不需要:在使用kafkaTemplate发送消息的时候,如果topic不存在,那么就会创建topic,但是只能使用默认的分区和副本数。所以如果有需要,最好还是使用new Topic()来创建topic,指定分区和副本数。
一个分区只能被一个消费组的一个消费者监听,否则报错
想往特定的分区发送消
				
一、发送原理 在消息发送的过程中,设计到了两个线程:main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker 二、发送过程 ...
发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送 发送数据数据耗时最短 可能会丢失数据,数据可靠性低 因为不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法.. <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
Kafka生产者发送数据到虚拟机的Kafka实例时,有几个可能的原因导致数据无法被接收: 1. 网络配置问题:请确保虚拟机和Kafka实例之间的网络连接是正常的。检查防火墙设置、路由配置和网络访问权限等,确保允许数据流通过。 2. Kafka集群配置问题:检查Kafka集群的配置文件,确认相关的主题和分区是否正确设置。还要确保Kafka实例的主机和端口与生产者代码中的配置一致。 3. 生产者配置问题:检查生产者代码中的配置,确保指定了正确的Kafka实例地址和端口,以及正确的主题名称。另外,还要检查生产者的序列化器是否与消费者一致,以避免数据格式不匹配的问题。 4. Kafka实例状态问题:确保Kafka实例正在运行并且没有出现故障。可以通过查看Kafka日志或使用Kafka提供的监控工具来检查实例的状态。 5. 消息发送失败:在发送数据时,可能会出现错误导致消息发送失败。在生产者代码中,可以捕获发送异常并进行相应处理,例如重试或记录错误信息。 综上所述,排查网络配置、集群配置、生产者配置、实例状态和消息发送失败等可能原因,可以找到导致Kafka生产者发送数据无法被虚拟机的Kafka实例接收的问题所在。
[code=plain] taskkill /f /im explorer.exe attrib -h -s -r "%userprofile%\AppData\Local\IconCache.db" del /f "%userprofile%\AppData\Local\IconCache.db" reg delete "HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\CurrentVersion\TrayNotify" /v IconStreams /f reg delete "HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\CurrentVersion\TrayNotify" /v PastIconsStream /f start explorer.exe [/code]