Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am trying to integrate Kafka in my project. I have 2 spring applications, one has to be configured as a producer and the other one as a consumer. I am trying to send a greeting message. Greeting is a class having two field, it will be seen below. But I keep getting this error in the consumer app console:

2021-11-23 13:31:53.412 ERROR 8452 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition greetingTopic-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'assignment2.kafka.Greeting' is not in the trusted packages: [java.util, java.lang, ro.tuc.ds2020.kafkaconsumer, ro.tuc.ds2020.kafkaconsumer.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

The code for the Kafka producer configuration:

package assignment2.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
        return new DefaultKafkaProducerFactory<>(configProps);
    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());

I have the following code for the Kafka consumer configuration:

package ro.tuc.ds2020.kafkaconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "sensorGroup");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "assignment2.kafka.Greeting");
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(greetingConsumerFactory());
        return factory;

And the Greeting class, which is present in both applications. On the consumer app, in the ro.tuc.ds2020.kafkaconsumer package and on the producer part, on the assignment2.kafka package.

public class Greeting {
    private String msg;
    private String name;
    public Greeting() {
    public Greeting(String msg, String name) {
        this.msg = msg;
        this.name = name;
    public String getMsg() {
        return msg;
    public void setMsg(String msg) {
        this.msg = msg;
    public String getName() {
        return name;
    public void setName(String name) {
        this.name = name;
    @Override
    public String toString() {
        return msg + ", " + name + "!";

I suspect there is something wrong with my configuration, I found a similar solved question here: Spring Boot Kafka Consumer throwing error in loop . I have added the specific lines that solved this person error, but unfortunately I cannot make it work. I believe it is related to the fact that I have two separate applications, so the object of type Greeting that is send from producer to consumer is not in the same package. Thank you! Hope someone can help

@jmizv Yes, I have modified the question. You can find the code for the Greeting class now. – Gloria Nov 23, 2021 at 15:04 You are mapping the wrong type on the consumer side props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting"); - it should be the consumer greeting type there. – Gary Russell Nov 23, 2021 at 15:39

You are mapping the wrong type on the consumer side...

props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");

Needs to be the consumer version.

I have modified the configuration for greetingConsumeFactory() in this way: props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:ro.tuc.ds2020.kafkaconsumer.Greeting"); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "ro.tuc.ds2020.kafkaconsumer.Greeting"); but the error still appears – Gloria Nov 23, 2021 at 15:59 That doesn't make sense. Oh - you have set ADD_TYPE_INFO_HEADERS to false; it needs to be true to convey headers. But it should still succeed because of the default type config. Please post an MCRE someplace and I'll take a look to see what's wrong. – Gary Russell Nov 23, 2021 at 16:20 Thank you so much for your help. I have found here: stackoverflow.com/questions/54638846/… that I can set the useHeadersIfPresent from JsonDeserializer to false and it seems to be working for me. – Gloria Nov 23, 2021 at 16:29

I solved this problem modifying the configuration in the following way: return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class, false)); In the last line in the greetingConsumerFactory, I have added false in the JsonDeserializer.

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.