相关文章推荐
老实的玉米  ·  springboot+kafka中@Kafk ...·  1 月前    · 
难过的盒饭  ·  Canal ...·  1 月前    · 
失落的企鹅  ·  Azure Active ...·  1 年前    · 
愉快的钥匙扣  ·  Microsoft.Azure.WebJob ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I saw and implemented the method KafkaTemplate.send(TOPIC,message) with default partitioner class.

But here, I am not passing keys. I have a simple custom partitioner class and I also wanna send to kafka server like KafkaTemplate(TOPIC,key,message) where in producerConfig I set my customPartitioner class for partitioning.

I saw this Will send(Topic, Key, Message) method of KafkaTemplate calls Partition method if I provide custom Partitioner? but I didn't get it fully.

  • my simple customPartitioner class:
  • public class CustomPartitionar implements Partitioner {
       private PartitionMapper newMapper;
       public CustomPartitionar(){
           newMapper = new PartitionMapper();
       @Override
       public void configure(Map<String, ?> configs) {
       @Override
       public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
           int partition = 0;
           String userName = (String) key;
           // Find the id of current user based on the username from another mapper class
           Integer userId = newMapper.findUserId(userName);
           // If the userId not found, default partition is 0
           if (userId != null) {
               partition = userId;
           return partition;
       @Override
       public void close() {
    
  • added this class to producerFactory:
  • config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitionar.class);
    
  • actually my key will be get from "message.getReceiver()" and topic will be get from "message.getTopic()" so my messages will go to desired topic and partition belongs to that user/group..so I just wanna send like:
  • KafkaTemplate.send(message.getTopic(),message.getReceiver(),message)
    

    can this be possible in a simple way or am I missing something?

    It's not clear what you question is; what you have should work fine. If it's not, then explain what's happening. – Gary Russell May 19, 2020 at 15:26 does KafkaTemplate already has method ".send(topic,key,value)" ? as far as I saw kafkatemplate has method like ".send(topic,message)" ..I am confused how can I also specify the "key" in here while sending after adding custom partitionar – Nafiul Alam Fuji May 19, 2020 at 16:56 * Send the data to the default topic with no key or partition. * @param data The data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> sendDefault(V data); * Send the data to the default topic with the provided key and no partition. * @param key the key. * @param data The data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); * Send the data to the default topic with the provided key and partition. * @param partition the partition. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); * Send the data to the default topic with the provided key and partition. * @param partition the partition. * @param timestamp the timestamp of the record. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. * @since 1.3 ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); * Send the data to the provided topic with no key or partition. * @param topic the topic. * @param data The data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> send(String topic, V data); * Send the data to the provided topic with the provided key and no partition. * @param topic the topic. * @param key the key. * @param data The data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); * Send the data to the provided topic with the provided key and partition. * @param topic the topic. * @param partition the partition. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); * Send the data to the provided topic with the provided key and partition. * @param topic the topic. * @param partition the partition. * @param timestamp the timestamp of the record. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. * @since 1.3 ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); * Send the provided {@link ProducerRecord}. * @param record the record. * @return a Future for the {@link SendResult}. * @since 1.3 ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); * Send a message with routing information in message headers. The message payload * may be converted before sending. * @param message the message to send. * @return a Future for the {@link SendResult}. * @see org.springframework.kafka.support.KafkaHeaders#TOPIC * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY ListenableFuture<SendResult<K, V>> send(Message<?> message); Don't put code in comments; it's unreadable. Edit the question instead and comment that you have done so. I can assure you that send(topic, key, value) is there. The template declaration should be new KafkaTemplate<String, Message>. You will get a compiler error with 3 generic parameters. The 2 genericl parameters represent the key type and value type. With <String, Message> you will be able to use send(String topic, String key, Message message). – Gary Russell May 20, 2020 at 15:20 ` ListenableFuture<SendResult<String, Message_2>> future ; future = (ListenableFuture<SendResult<String, Message_2>>) kafkaTemplate.send(TOPIC,KEY,message); future.addCallback(new ListenableFutureCallback<SendResult<String, Message_2>>() {@override onSuccess() and onFailure.....}` here cannot resolve "addCallback() method"..why is that? in config, producerFactory<String,Message_2> have anything to do with that? – Nafiul Alam Fuji May 20, 2020 at 15:33

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.