Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
I have created a sample application to check my producer's code. My application runs fine when I'm sending data without a partitioning key. But, on specifying a key for data partitioning I'm getting the error:
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
for both consumer and producer. I have searched a lot on the internet, they have suggested to verify kafka.acl settings. I'm using kafka on HDInsight and I have no idea how to verify it and solve this issue.
My cluster has following configuration:
Head Node: 2
Worker Node:4
Zookeeper: 3
MY producer code:
public static void produce(String brokers, String topicName) throws IOException{
// Set properties used to configure the producer
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// specify the protocol for Domain Joined clusters
//To create an Idempotent Producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
// So we can generate random sentences
Random random = new Random();
String[] sentences = new String[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature",
for(String sentence: sentences){
// Send the sentence to the test topic
String key=sentence.substring(0,2);
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
catch (Exception ex)
System.out.print(ex.getMessage());
throw new IOException(ex.toString());
producer.commitTransaction();
Also, My topic consists of 3 partitions with replication factor=3
The error clearly states that the topic (or partition) you are producing to does not exist.
Ultimately, you will need to describe the topic (via CLI kafka-topics --describe --topic <topicName> or other means) to verify if this is true
Kafka on HDInsight and I have no idea how to verify it and solve this issue.
ACLs are only setup if you installed the cluster with them, but I believe you can still list ACLs via zookeper-shell or SSHing into one of Hadoop masters.
–
–
I too had the same issue while creating a new topic. And when I described the topic, I could see that leaders were not assigned to the topic partitions.
Topic: xxxxxxxxx Partition: 0 Leader: none Replicas: 3,2,1 Isr:
Topic: xxxxxxxxx Partition: 1 Leader: none Replicas: 1,3,2 Isr:
After some googling, figured out that this could happen when we some issue with controller broker, so restarted the controller broker.
And Everything worked as expected...!
If the topic exists but you're still seeing this error, it could mean that the supplied list of brokers is incorrect. Check the bootstrap.servers value, it should be pointing to the right Kafka cluster where the topic resides.
I saw the same issue and I have multiple Kafka clusters and the topic clearly exists. However, my list of brokers was incorrect.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.