相关文章推荐
曾深爱过的蛋挞  ·  libpthread.so.0: ...·  1 年前    · 
英姿勃勃的针织衫  ·  failed to convert ...·  2 年前    · 

Spring Boot Kafka - 序列化和反序列化JSON

在Spring Boot应用集成Kafka读写消息 一文中说明了如何通过Spring Kafka来发送和接收字符串消息。

本文描述了如何通过Spring Kafka如何在发送消息时序列化JSON(将对象转换成JSON字节流),如何在接收数据时反序列化JSON(将JSON字节流转换成对象)。

配置JsonSerializer和JsonDeserializer

application.yaml 中配置JsonSerializer和JsonDeserializer。

server:
  port: 8080
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: '*'
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  • Consumer的value-deserializer 配置为 org.springframework.kafka.support.serializer.JsonDeserializer,表示在接收消息时用JSON反序列化。
  • 配置spring.kafka.consumer.properties.spring.json.trusted.packages='*' 表示接受反序列化任意的类,也可限定包路径。
  • Producer的value-serializer 配置为org.springframework.kafka.support.serializer.JsonSerializer,表示在发送消息时用JSON序列化。

定义一个Model类

定义一个User类。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private String firstName;
    private String lastName;
    private int age;
  • 通过Lombok注解来简化POJO的样板代码。

Producer类

定义一个UserProducer类。

@Service
@RequiredArgsConstructor
@Slf4j
public class UserProducer {
    private final KafkaTemplate<String, User> kafkaTemplate;
    public void createUser(User user) {
        log.info(String.format("#### -> Producing message -> %s", user));
        kafkaTemplate.send(KafkaConstants.TOPIC_USERS, user);
  • KafkaTemplate<String, User> 表示message key为String,message value为User对象 (对象被序列化后为JSON字节流)。

Consumer类

定义一个UserConsumer类。

@Service
@Slf4j
public class UserConsumer {
    @KafkaListener(topics = KafkaConstants.TOPIC_USERS, groupId = KafkaConstants.CONSUMER_GROUP)
    public void consume(User user) {
        log.info(String.format("#### -> Consumed message -> %s", user));
  • consume() 方法的参数为User对象(JSON字节流被反序列化为对象)。

Controller类

定义一个UserController类用来作API测试。

@RestController
@RequestMapping("/users")
@RequiredArgsConstructor
public class UserController {
    private final UserProducer userProducer;
    @PostMapping
    public void createUser(@RequestBody User user) {
        userProducer.createUser(user);

用HttPie工具来测试:

http :8080/users firstName="Cat" lastName="Tommy" age=3
http :8080/users firstName="Mouse" lastName="Jackie" age=2

此时如果再测试之前的例子在Spring Boot应用集成Kafka读写消息 ,也一样成功。

本文代码示例:

文章目录Spring Boot Kafka - 序列和反序列化JSON前言配置JsonSerializer和JsonDeserializer定义一个Model类Producer类Consumer类Controller类测试小结参考文档Spring Boot Kafka - 序列和反序列化JSON前言在在Spring Boot应用集成Kafka读写消息 一文中说明了如何通过Spring Kafka来发送和接收字符串消息。本文描述了如何通过Spring Kafka如何在发送消息时序列化JSON(将对象转
kafka在生产消息发送到broker之前要经过序列化的过程,消费者在消费消息前,消息会经过反序列化序列化反序列化的配置在application.yml可以通过下面方式配置。 spring: kafka: producer: #生产者key的序列化器 key-serializer: org.apache.kafka.common.serialization.IntegerSerializer #生产者value的序列化
Kafka的样本Spring Boot 公开了一个使用json消息并发布到kafka主题中的api 要通过泊坞窗启动zookeeper和kafka,请执行以下操作: docker-compose up 生成应用程序映像 docker build --no-cache -t sample_kafka:0.0.1 . -f docker\Dockerfile 创建主题: bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test 要开始制作,请执行以下操作: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 要启动消费者: 将 zoo_sample.cfg 重命名为 zoo.cfg 编辑 zoo.cfg, 指定 dataDir 运行 cmd, 进入 bin`目录,输入`zkServer.cmd,启动 zookeeper. kafka 安装 运行 cmd, 输入 输入bin\windows\kafka-server-start.bat config\server.properties,启动 kafka 运行以下命令 kafka-topics.bat --create --zookeeper localhost:2181 --replica
带有Kafka Consumer实例的Spring Boot 本项目介绍如何将Spring BootSpring Kafka结合使用以使用Kafka主题中的JSON / String消息 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka服务器 bin/kafka-server-start.sh config/server.properties 创建Kafka主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Kafka_Example bin/kafka-topics.sh --create --zookeeper loc
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.RetriableException; import org.slf4j.Logger; import
今天在启动Kafka时,出现了一些问题。Kafka启动后,卡在了某一消费点,报 Missing exception handling for deserialization of key values,提示缺少对键值反序列的异常处理,并且系统一直重复反序列化该调记录,一直失败,陷入死循环。 上网查询解决办法发现这种现象是当生产者序列化程序和消费者反序列化程序不兼容时产生的一种毒丸场景。在以下场景可能会发生毒丸现象: 生产者更改了键或值序列化器,并
1. 概述 Kafka传输自定义的DTO对象时,不能像平时一样使用StringSerializer和StringDeserializer。这种情况需要自己实现对应DTO的序列化器和反序列化器。 假设现在有个 KafkaMsgDto 类,代码如下: @Data public class KafkaMsgDto { private String id; private ActionEnum action; public KafkaMsgDto(){ public
Azure Event Hubs Kafka示例 这是一个使用Spring BootSpring Kafka通过Kafka协议连接到Azure Event Hub的最小示例。 您可以使用以下指南在Azure上创建启用了Kafka的事件中心: 创建一个名为test-topic事件(又称为Event Hub) test-topic 现在,您必须配置以下两个Spring属性,例如,在application.yml 。 event-hubs-connection-string: "Endpoint=sb://sth-event-hubs.servicebus.windows.net/;SharedAccessKeyName=sth-policy;SharedAccessKey=xxxxxx" event-hubs-fqdn: "sth-event-hubs.servicebus.win
java -jar target/spring-kafka-protobuf-0.0.1-SNAPSHOT.jar curl -v 'http://localhost:8080/rest/n/spring/kafka/protobuf/demo' 查看并检查弹簧日志 Spring Kafka概述 Spring提供了 Spring-Kafka 项目来操作 Kafka。 https://spring.io/projects/spring-kafka 我们先对 Kafka-Spring 做个快速入门,实现 Producer发送消息 ,同时Consumer 消费消息。 为了方便测试验证 ,我们把生产者和消费者都写到一个工程里面。 import com.alibaba.fastjson.JSONObject; Map<String, Object> map=(Map<String, Object>)JSONObject.parse(jsonStr); Map转换成JSON import com.alibaba.fastjson.JSONObject; Strin...
spring-boot-kafka-consumer-example是一个使用Spring Boot框架和Apache Kafka消息队列的消费者示例。该示例演示了如何使用Spring Kafka进行消费者端开发,以便从Kafka中接收消息。 在该示例中,首先需要添加Spring Kafka和Apache Kafka依赖项。然后可以开始编写消费者代码。通过定义一个KafkaListener注释方法,可以指定需要消费消息的主题和分组。 当收到消息时,该方法将被自动调用,其中消息将作为参数传递。通过处理消息,可以实现所需的业务逻辑。 在Spring Boot应用程序中,可以使用@SpringBootApplication注释指定启动类,并使用@EnableKafka注释启动应用程序中的Kafka支持。通过将一些配置属性添加到application.properties文件中,可以配置用于连接Kafka的属性,如Kafka服务地址和端口。 使用spring-boot-kafka-consumer-example可以快速开始Kafka消费者开发,并且由于Spring Boot的优点,开发过程变得更加高效,并且应用程序可靠性也得到了提高。