Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Kafka Producer - org.apache.kafka.common.serialization.StringSerializer could not be found

Ask Question

I have creating a simple Kafka Producer & Consumer.I am using kafka_2.11-0.9.0.0. Here is my Producer code.

public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            StringSerializer.class.getName());
    props.put("value.serializer",
            StringSerializer.class.getName());
    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                topicName, Integer.toString(i), Integer.toString(i));
        System.out.println(producerRecord);
        producer.send(producerRecord);
    producer.close();

While starting the bundle I a facing the below error:

2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil                        | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
    Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
    at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
    ... 16 more

I have tried setting the key.serializer and value.serializer like below:

props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());

Also like, But still getting the same error. What is I am doing wrong here.

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                hey @Sanjeev try using   props.put("key.serializer",           "org.apache.kafka.common.serialization.StringSerializer");   props.put("value.serializer",           "org.apache.kafka.common.serialization.StringSerializer"); I have posted full example
– Vipul Gulhane
                Sep 22, 2018 at 4:19

I find the reason by reading the kafka client source code.

kafka client use Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()) to get the Class object, and the create the instance, the key point is the classLoader, which is specified by the last param, the implementation of method Utils.getContextOrKafkaClassLoader() is

public static ClassLoader getContextOrKafkaClassLoader() {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    if (cl == null)
        return getKafkaClassLoader();
        return cl;

so, by default, the Class object of org.apache.kafka.common.serialization.StringSerializer is load by the applicationClassLoader, if your target class is not loaded by the applicationClassLoader, this problem will happend !

to solve the problem, simply set the ContextClassLoader of current thread to null before new KafkaProducer instance like this

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);

hope my answer can let you know what happend .

If I add this Thread.currentThread().setContextClassLoader(null); I get problem in registering other bean. – Sheel Dec 15, 2018 at 5:00 @Sheel You might save current thread context classloader in a variable, set context class loader for current thread to null, create kafka producer and then restore mentioned class loader from the variable. There is an example: https://stackoverflow.com/a/53653490/1673775. This is still workaround. You might consider using other version of kafka or try to find out what is going on with classloading in your app if you need a true solution. – luke Dec 20, 2018 at 8:50 Thanks buddy, actually I was importing wrong file.This issue is solved.Now struggling with another one,Let you know. – Sheel Dec 21, 2018 at 13:25

Its issue with the version you are using. It was also suggested to version 0.8.2.2_1. Suggest you to adjust the version of kafka you are using and give a try. code wise, I cross checked many code samples in kafka dev list and seems like you have written in right way.

i.e Thread.currentThread().setContextClassLoader(null);

Sorry for late reply. Updating the kafka version to 0.8.2.2_1 or to latest didn't work. Adding the below line before setting the properties did the trick. Thread.currentThread().setContextClassLoader(null); – Sanjeev Sep 22, 2016 at 13:32

The issue appears to be with the class loader, as @Ram Ghadiyaram indicated in his answer. In order to get this working with kafka-clients 2.x, I had to do the following:

public Producer<String, String> createProducer() {
            ClassLoader original = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(null);
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        BOOTSTRAP_SERVERS);
    ... etc ...
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    Thread.currentThread().setContextClassLoader(original);
    return producer;

This allows the system to continue loading additional classes with the original classloader. This was needed in Wildfly/JBoss (the specific app I'm working with is Keycloak).

import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer;    
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer {
   public static void main(String[] args) throws Exception{
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      String topicName = args[0].toString(); 
      Properties props = new Properties();
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      //Specify buffer size in config
      props.put("batch.size", 16384);
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();

Recently i found the solution. Setting the Thead Context loader to null resolved the issue for me. Thanks.

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);

It happens because of kafka-version issue. Make sure, you use the correct kafka version. The version that I used is 'kafka_2.12-1.0.1'

But try using below properties in your code .This fixed my issue.

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

Earlier I was using below properties which was causing the issue.

//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.