Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'

Ask Question

Produder properties

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Consumer properties

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

Consumer Service

@Service
public class UserConsumerService {
    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());

Producer Service

@Service
public class UserProducerService {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;
    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);

Producer Config for creating topic

    @Configuration public class KafkaConfig {
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();

Producer works well but Consumer gives error like

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an

'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1760 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1283 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na] Executors.java:539 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] FutureTask.java:264 at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] Thread.java:833 Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition user-topic-0 at offset 1. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429 at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:134 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:721 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:672 at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1507 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1497 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage

I will be glad if you help since I am new to kafka and trying to figure out why getting this error

Are you sure that your consumer properties are correct? Please, show us really a consumer part and be sure that you use Deserializer there. – Artem Bilan Dec 6, 2021 at 21:22 Thanks. All good now on that side. Can we see more in stack trace about that error? I believe it should report the cause of such a SerializationException... – Artem Bilan Dec 6, 2021 at 21:29 Added more as i could, when i run consumer service in ide (vscode) ide freezes and does not let me even copy entire logs so I need to restart – Mixage Dec 6, 2021 at 21:32

Does the error message not tell you anything?

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

With Boot:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer thanks Garry, it allowed me to see real exception without freezing issue in ide finally. now i face "Class not found" on consumer and i think your answer will help me to fix it too from stackoverflow.com/questions/68987866/… – Mixage Dec 6, 2021 at 21:46

In my case, my consumer was consuming a generic of the following

 @KafkaListener(
      topics = "${spring.kafka.topics.foo}",
      containerFactory = "foo")
  public <T extends DomainEvent> void listen(List<T> domainEvents, Acknowledgment acknowledgment) { 

And was complaining about this new subtype, ChildFoo2, that I added, which extends my DomainEvent. However, I had forgotten to add my new child class as a JsonSubType to my parent class.

So doing the following, was the real fix for my case

@JsonSubTypes({@Type(ChildFoo1.class), @Type(ChildFoo2.class)})
public class DomainEvent {
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.