Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am sending Array of JSON data to Kafka topic using Spring boot Application, but getting the following error:

error :org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.kafka.common.serialization.StringSerializer; for 
configuration key.serializer: Class 
org.apache.kafka.common.serialization.StringSerializer; could not be found.

I have tried to change serialization configuration to this:

props.put("key.serializer",  org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");

Configuration file and Service File code:

@Configuration
public class KafkaProducerConfig {
@Bean
private static ProducerFactory<String, String> producerConfig() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer;");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer;");
            // The following properties are used by LiKafkaProducerImpl
            props.put("large.message.enabled", "true");
            props.put("max.message.segment.bytes", 1000 * 1024);
            props.put("segment.serializer", DefaultSegmentSerializer.class.getName());
            props.put("auditor.class", LoggingAuditor.class.getName());
            return new DefaultKafkaProducerFactory(props);
@Service
public class KafkaSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${kafka.topic.name}")
    private String topicName;
    public void sendData(List<Student> student) {
        System.out.println("Inside Student"+ student.toString());
        System.out.println("Inside Student"+ student);
        // TODO Auto-generated method stub
        Map<String, Object> headers = new HashMap<>();
        headers.put(KafkaHeaders.TOPIC, topicName);     
        System.out.println("\nStudent=  " + headers);
        // Construct a JSONObject from a Map.
        JSONObject HeaderObject = new JSONObject(headers);
        System.out.println("\nUsing new JSONObject() ==> " + HeaderObject);
        final String record = HeaderObject.toString();
        final int recordSize = record.length();
        kafkaTemplate.send(new GenericMessage<>(student, headers));
        LOGGER.info("Data - " + student + " sent to Kafka Topic - " + topicName);

POST json:

"studentId": "Q45678123", "firstName": "abc", "lastName": "xyz", "age": "12", "address": { "apartment": "apt 123", "street": "street Info", "state": "state", "city": "city", "postCode": "12345" "studentId": "Q45678123", "firstName": "abc", "lastName": "xyz", "age": "12", "address": { "apartment": "apt 123", "street": "street Info", "state": "state", "city": "city", "postCode": "12345"

You need to remove the semicolon from the end of the values

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

Or you could use the class.getName() method like you did for the segment serializer, which I would suggest is safer because then it guarantees that the serializer you want is available at compile time

thanks for the answer. i tried it but getting following error: org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.util.ArrayList to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer – Vaibhav Shelar Sep 16, 2019 at 4:51 Note: sending a list of data is not recommended, and individual objects won't arrive in the topic. You'll end up with one message containing the entire list. If that's not expected, you need a loop – OneCricketeer Sep 16, 2019 at 5:11 Thanks for help. it worked. now working on consumer side. but in consumer getting following error-org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message I changed Deserializer value to String. – Vaibhav Shelar Sep 16, 2019 at 6:20

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.