Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I saw and implemented the method
KafkaTemplate.send(TOPIC,message)
with default partitioner class.
But here, I am not passing keys. I have a simple
custom partitioner class
and I also wanna send to kafka server like
KafkaTemplate(TOPIC,key,message)
where in producerConfig I set my customPartitioner class for partitioning.
I saw this
Will send(Topic, Key, Message) method of KafkaTemplate calls Partition method if I provide custom Partitioner?
but I didn't get it fully.
my simple customPartitioner class:
public class CustomPartitionar implements Partitioner {
private PartitionMapper newMapper;
public CustomPartitionar(){
newMapper = new PartitionMapper();
@Override
public void configure(Map<String, ?> configs) {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
int partition = 0;
String userName = (String) key;
// Find the id of current user based on the username from another mapper class
Integer userId = newMapper.findUserId(userName);
// If the userId not found, default partition is 0
if (userId != null) {
partition = userId;
return partition;
@Override
public void close() {
added this class to producerFactory:
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitionar.class);
actually my key will be get from "message.getReceiver()" and topic will be get from "message.getTopic()" so my messages will go to desired topic and partition belongs to that user/group..so I just wanna send like:
KafkaTemplate.send(message.getTopic(),message.getReceiver(),message)
can this be possible in a simple way or am I missing something?
–
–
* Send the data to the default topic with no key or partition.
* @param data The data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> sendDefault(V data);
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
* Send the data to the default topic with the provided key and partition.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
* Send the data to the default topic with the provided key and partition.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> send(String topic, V data);
* Send the data to the provided topic with the provided key and no partition.
* @param topic the topic.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
* Send the provided {@link ProducerRecord}.
* @param record the record.
* @return a Future for the {@link SendResult}.
* @since 1.3
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
ListenableFuture<SendResult<K, V>> send(Message<?> message);
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.