@RestController
@Slf4j
public class LoggerController {
@Autowired
KafkaTemplate kafkaTemplate;
@RequestMapping("/applog")
public String applog(@RequestBody String applog) {
log.info(applog);
JSONObject jsonObject = JSON.parseObject(applog);
JSONObject startJsonbject = jsonObject.getJSONObject("start");
if(startJsonbject != null){
kafkaTemplate.send("gmall_start",applog);
}else {
kafkaTemplate.send("gmall_event",applog);
return "applog success ~~~";
评论,点赞、关注等系统通知是 并发、异步的。可以将其抽象为event时间(Entity类转化为event类,然后让Kafka处理)
定义Event实体
package com.nowcoder.community.entity;
import java.util.HashMap;
import java.util.Map;
public class
消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图
我们从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。
在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输
生产者在发送消息到Kafka之前,需要显示地使用代码或者在Kafka服务器上创建topic吗?
不需要:在使用kafkaTemplate发送消息的时候,如果topic不存在,那么就会创建topic,但是只能使用默认的分区和副本数。所以如果有需要,最好还是使用new Topic()来创建topic,指定分区和副本数。
一个分区只能被一个消费组的一个消费者监听,否则报错
想往特定的分区发送消
一、发送原理
在消息发送的过程中,设计到了两个线程:main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker
二、发送过程
...
有发送数据的状态,不会丢失数据,数据可靠性高
以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送
发送数据数据耗时最短
可能会丢失数据,数据可靠性低
因为不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法..
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
Kafka生产者发送数据到虚拟机的Kafka实例时,有几个可能的原因导致数据无法被接收:
1. 网络配置问题:请确保虚拟机和Kafka实例之间的网络连接是正常的。检查防火墙设置、路由配置和网络访问权限等,确保允许数据流通过。
2. Kafka集群配置问题:检查Kafka集群的配置文件,确认相关的主题和分区是否正确设置。还要确保Kafka实例的主机和端口与生产者代码中的配置一致。
3. 生产者配置问题:检查生产者代码中的配置,确保指定了正确的Kafka实例地址和端口,以及正确的主题名称。另外,还要检查生产者的序列化器是否与消费者一致,以避免数据格式不匹配的问题。
4. Kafka实例状态问题:确保Kafka实例正在运行并且没有出现故障。可以通过查看Kafka日志或使用Kafka提供的监控工具来检查实例的状态。
5. 消息发送失败:在发送数据时,可能会出现错误导致消息发送失败。在生产者代码中,可以捕获发送异常并进行相应处理,例如重试或记录错误信息。
综上所述,排查网络配置、集群配置、生产者配置、实例状态和消息发送失败等可能原因,可以找到导致Kafka生产者发送数据无法被虚拟机的Kafka实例接收的问题所在。
[code=plain]
taskkill /f /im explorer.exe
attrib -h -s -r "%userprofile%\AppData\Local\IconCache.db"
del /f "%userprofile%\AppData\Local\IconCache.db"
reg delete "HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\CurrentVersion\TrayNotify" /v IconStreams /f
reg delete "HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\CurrentVersion\TrayNotify" /v PastIconsStream /f
start explorer.exe
[/code]