Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
I have creating a simple Kafka Producer & Consumer.I am using kafka_2.11-0.9.0.0. Here is my Producer code.
public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
StringSerializer.class.getName());
props.put("value.serializer",
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
topicName, Integer.toString(i), Integer.toString(i));
System.out.println(producerRecord);
producer.send(producerRecord);
producer.close();
While starting the bundle I a facing the below error:
2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
... 16 more
I have tried setting the key.serializer
and value.serializer
like below:
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
Also like, But still getting the same error. What is I am doing wrong here.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
–
I find the reason by reading the kafka client source code.
kafka client use Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader())
to get the Class object, and the create the instance, the key point is the classLoader, which is specified by the last param, the implementation of method Utils.getContextOrKafkaClassLoader()
is
public static ClassLoader getContextOrKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null)
return getKafkaClassLoader();
return cl;
so, by default, the Class object of org.apache.kafka.common.serialization.StringSerializer
is load by the applicationClassLoader, if your target class is not loaded by the applicationClassLoader, this problem will happend !
to solve the problem, simply set the ContextClassLoader of current thread to null before new KafkaProducer instance like this
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
hope my answer can let you know what happend .
–
–
–
Its issue with the version you are using.
It was also suggested to version 0.8.2.2_1.
Suggest you to adjust the version of kafka you are using and give a try.
code wise, I cross checked many code samples in kafka dev list and seems like you have written in right way.
i.e Thread.currentThread().setContextClassLoader(null);
–
The issue appears to be with the class loader, as @Ram Ghadiyaram indicated in his answer. In order to get this working with kafka-clients 2.x, I had to do the following:
public Producer<String, String> createProducer() {
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
... etc ...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Thread.currentThread().setContextClassLoader(original);
return producer;
This allows the system to continue loading additional classes with the original classloader. This was needed in Wildfly/JBoss (the specific app I'm working with is Keycloak).
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer {
public static void main(String[] args) throws Exception{
if(args.length == 0){
System.out.println("Enter topic name”);
return;
String topicName = args[0].toString();
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
Recently i found the solution. Setting the Thead Context loader to null resolved the issue for me. Thanks.
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
It happens because of kafka-version issue. Make sure, you use the correct kafka version. The version that I used is 'kafka_2.12-1.0.1'
But try using below properties in your code .This fixed my issue.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Earlier I was using below properties which was causing the issue.
//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.