Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams
  • a REST Controller that writes to a kafka topic, STREAM_TOPIC_IN_QQQ
  • a KafkaListener that reads from STREAM_TOPIC_IN_QQQ (groupId="bar") and logs
  • a KStream that peeks the topic and logs it, converts it to another type, then writes it to STREAM_TOPIC_OUT_QQQ
  • another KafkaListener that reads from STREAM_TOPIC_OUT_QQQ.
  • (I've been changing the suffix to avoid any possible confusion, and creating the topics by hand, because otherwise I was getting a warning, STREAM_TOPIC_IN_ xxx =LEADER_NOT_AVAILABLE and the stream would not run for a minute or so.)

    The first listener and the stream seem to be working, but when the listener on the STREAM_OUT_TOPIC tries to deserialize the message, I get the exception below. I am providing the serde in the stream with Produced.with. What do I need to do so that the listener knows the type to deserialize to?

    11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
    11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
    11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
    11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
    11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
    org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
        at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    

    Here's the configuration:

    REST (spring mvc):

    @RequestMapping("/greeting")
    public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
        Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
        this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
        logger.debug("Sending a Kafka Message");
        return gr;
    

    Kafka Config (spring-kafka):

    @Bean
    public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
        KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
        stream.peek((k, greeting) -> {
            logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
              .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
              .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
        return stream;
    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
    public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
        logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
    public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
        logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
    

    application.yml

    spring:
    kafka:
      bootstrap-servers:  localhost:9092
      consumer:
        group-id: foo
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      streams:
        application-id: kafka9000-v0.1
        properties: # properties not explicitly handled by KafkaProperties.streams
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      

    JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

    It's spring.json.value.default.type

    You can also set spring.json.use.type.headers (default true) to prevent even looking for headers.

    The deserializer automatically trusts the package of the default type so it's not necessary to add it there.

    However, also see Spring Messaging Message Conversion.

    Use a BytesDeserializer and BytesJsonMessageConverter and the framework will pass the method parameter type as the target for conversion.

    I should have specified that I want other than the default. Isn't it common to want to listen for different types on different topics? In any case, why is there no header information present in the first place? – mconner Mar 11, 2019 at 21:40 Ah - ok - instead of a JsonDeserializer use a StringDeserializer (or BytesDeserializer) with a StringJsonMessageConverter (or BytesJsonMessageConverter) (the Bytes... versions are more efficient. Then, the framework tells the converter the type found in the listener method. Again, see the documentation, – Gary Russell Mar 11, 2019 at 21:53 If Boot auto-configuration finds a converter @Bean it will automatically wire it into the container factory. – Gary Russell Mar 11, 2019 at 21:58 Ah. Yes, you can use Message<Greeting> or just Greeting. You can get other fields from the ConsumerRecord. E.g "@Header(KafkaHeaders.OFFSET) long offset" if you need them (or message.getHeaders().get(...) if you take the first option. – Gary Russell Mar 12, 2019 at 21:19 seems to me that the BytesDeserializer, BytesJsonMessageConverter solution is the solution that most people would want to use as using headers for deciding the type is only useful for spring or at least java producers. – raven Aug 14, 2021 at 5:54

    So I was also facing the same problem.

    I fixed it this way

    You have to set the following property to the class you are trying to deserialze to

    spring.json.value.default.type=com.something.model.TransactionEventPayload
    

    I set the properties for the KafkaListener as this:

    @KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
     public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
                           @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                           @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
    

    "Answering" my own question largely to consolidate the info in the comments to and from @GaryRussell, but basically, he provided the best answer. In short I did the following:

  • Set consumer deserializer to a StringDeserializer
  • Add a messageConverter bean as a StringJsonMessageConverter
  • In the KafkaListener annotated methods, Just use the expected type for the Payload
  • If using a ConsumerRecord in the KafaListener annotated method, do NOT expect it to be of the Payload type. It will now be String (since the message converter, not the deserializer is doing this).
  • One other thing: By default, simply adding the messageConverter also adds it to the automatically configured kafkaTemplate when using the spring boot autoconfigure. This doesn't seem to be an issue when calling kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting), though I think it may be if using send(Message).

    Below is a working config, in that I get the messages as expected with minimal configuration

    application.yml:

      spring:
        kafka:
          bootstrap-servers:  localhost:9092
          consumer:
            group-id: foo
            auto-offset-reset: latest
            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
            properties:
              spring.json.trusted.packages: com.teramedica.kafakaex001web.model
              spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
              spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
          streams:
            application-id: kafka9000-v0.1
            properties: # properties not explicitly handled by KafkaProperties.streams
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
              spring.json.trusted.packages: com.teramedica.kafakaex001web.model
    

    KafkaConfig:

            @Bean RecordMessageConverter messageConverter() {  return new StringJsonMessageConverter();  }
        @Bean
        public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
            KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
            stream.peek((k, greeting) -> {
                logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
                  .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
                  .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
            return stream;
        @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
        public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
        //    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
            logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
        @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
        public void listenForGreetingResponses(@Payload Greeting gr,
                ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
                @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
            //logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
            logger.info("STREAM_IN_TOPIC Listener:   Greeting: {}", gr.getContent());
            logger.info("STREAM_IN_TOPIC Listener:  From Headers: topic: {}, partition: {}, key: {}", topic, partition,
                        key);
            logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
                        record.topic(), record.partition(), record.key());
            logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
        @Bean
        public KafkaListenerErrorHandler myTopicErrorHandler() {
            return (m, e) -> {
                logger.error("Got an error {}", e.getMessage());
                return "some info about the failure";
    

    And output for a message is:

    13 Mar 2019 09:56:57,884   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
    13 Mar 2019 09:56:57,913   INFO     [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
    13 Mar 2019 09:56:57,919   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer]   Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
    13 Mar 2019 09:56:57,919   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Greeting: Hello, World!
    13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
    13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:  From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
    13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
    13 Mar 2019 09:56:57,921   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
    13 Mar 2019 09:56:58,030   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
    

    This exception is thrown by org.springframework.kafka.support.serializer.JsonDeserializer, which requires type information to be included in a special type header, or provided to @KafkaListener via the spring.json.value.default.type configuration property.
    That is how I solved this issue in SpringBoot 2.5.3:

  • Add the ByteArrayJsonMessageConverter to the Context:
  • import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
    import org.springframework.kafka.support.converter.JsonMessageConverter;
    @Configuration
    public class JsonMessageConverterConfig {
        @Bean
        public JsonMessageConverter jsonMessageConverter() {
            return new ByteArrayJsonMessageConverter();
    
  • Setup the app.kafka.producer.value-serializer and the app.kafka.consumer.value-deserializer:
  • app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
    
  • Now you can disable serialization of the TypeId header:
  • spring.kafka.producer.properties.spring.json.add.type.headers=false
    

    This is not an answer; but might help people landing here from search engines.

    If you are facing this exception when running KafkaStreams application.

  • Have you registered jsonSerde at all necessary places in your DSL ?
  • Have you provided jsonSerde at state store instatioation ?
  • Note 1: Ensure you initialized jsonSerde as described below:

    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.connect.json.JsonDeserializer;
    import org.apache.kafka.connect.json.JsonSerializer;
    Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    

    Note2: Most common mistake

    import org.springframework.kafka.support.serializer.JsonSerde;
    new JsonSerde<JsonNode>(); // This is wrong
    

    I my case the issue I already had published some different types of messages to Kafka topic and getting this exception.

    To Fix it.

    I created a new topic and published messages over there. Then started consumer on that topic and it worked fine.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.