相关文章推荐
坏坏的柿子  ·  Python KeyError ...·  4 月前    · 
傻傻的作业本  ·  root.destroy()和root.qu ...·  1 年前    · 
腼腆的蚂蚁  ·  Python ...·  2 年前    · 
体贴的松树  ·  java ...·  2 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Error while fetching metadata with correlation id 92 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

Ask Question

I have created a sample application to check my producer's code. My application runs fine when I'm sending data without a partitioning key. But, on specifying a key for data partitioning I'm getting the error:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

for both consumer and producer. I have searched a lot on the internet, they have suggested to verify kafka.acl settings. I'm using kafka on HDInsight and I have no idea how to verify it and solve this issue.

My cluster has following configuration:

  • Head Node: 2
  • Worker Node:4
  • Zookeeper: 3
  • MY producer code:

    public static void produce(String brokers, String topicName) throws IOException{
        // Set properties used to configure the producer
        Properties properties = new Properties();
          // Set the brokers (bootstrap servers)
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // specify the protocol for Domain Joined clusters
        //To create an Idempotent Producer
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        producer.initTransactions();
        // So we can generate random sentences
        Random random = new Random();
        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature",
        for(String sentence: sentences){
            // Send the sentence to the test topic
                String key=sentence.substring(0,2);
                producer.beginTransaction();
                producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
            catch (Exception ex)
              System.out.print(ex.getMessage());
                throw new IOException(ex.toString());
            producer.commitTransaction();
    

    Also, My topic consists of 3 partitions with replication factor=3

    The error clearly states that the topic (or partition) you are producing to does not exist.

    Ultimately, you will need to describe the topic (via CLI kafka-topics --describe --topic <topicName> or other means) to verify if this is true

    Kafka on HDInsight and I have no idea how to verify it and solve this issue.

    ACLs are only setup if you installed the cluster with them, but I believe you can still list ACLs via zookeper-shell or SSHing into one of Hadoop masters.

    I have made sure that my topic exists, also the code runs perfectly fine when I send without a partition key. Why does it show error on adding a partition key? – Anonymous Apr 15, 2020 at 5:26 Not sure what you mean by "partition key". It would be the "record key", and even if you don't actually set one, then it is still sent as null. What partitioner are you using? – OneCricketeer Apr 15, 2020 at 16:44

    I too had the same issue while creating a new topic. And when I described the topic, I could see that leaders were not assigned to the topic partitions.

    Topic: xxxxxxxxx Partition: 0 Leader: none Replicas: 3,2,1 Isr: Topic: xxxxxxxxx Partition: 1 Leader: none Replicas: 1,3,2 Isr:

    After some googling, figured out that this could happen when we some issue with controller broker, so restarted the controller broker.

    And Everything worked as expected...!

    If the topic exists but you're still seeing this error, it could mean that the supplied list of brokers is incorrect. Check the bootstrap.servers value, it should be pointing to the right Kafka cluster where the topic resides.

    I saw the same issue and I have multiple Kafka clusters and the topic clearly exists. However, my list of brokers was incorrect.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.