All Implemented Interfaces:
EventListener , Aware , BeanNameAware , DisposableBean , SmartInitializingSingleton , ApplicationContextAware , ApplicationListener < ContextStoppedEvent > , KafkaOperations <K, V>
Direct Known Subclasses:
ReplyingKafkaTemplate , RoutingKafkaTemplate
public class KafkaTemplate<K, V> extends Object implements KafkaOperations <K, V>, ApplicationContextAware , BeanNameAware , ApplicationListener < ContextStoppedEvent >, DisposableBean , SmartInitializingSingleton
A template for executing high-level operations. When used with a DefaultKafkaProducerFactory , the template is thread-safe. The producer factory and KafkaProducer ensure this; refer to their respective javadocs.
Author:
Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi

Nested Class Summary

Nested classes/interfaces inherited from interface org.springframework.kafka.core. KafkaOperations

KafkaOperations.OperationsCallback < K , V , T >, KafkaOperations.ProducerCallback < K , V , T >
Create an instance using the supplied producer factory and autoFlush false.
KafkaTemplate ( ProducerFactory < K , V > producerFactory, boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
KafkaTemplate ( ProducerFactory < K , V > producerFactory, boolean autoFlush, Map < String , Object > configOverrides)
Create an instance using the supplied producer factory and autoFlush setting.
KafkaTemplate ( ProducerFactory < K , V > producerFactory, Map < String , Object > configOverrides)
Create an instance using the supplied producer factory and properties, with autoFlush false.
protected void
closeProducer (org.apache.kafka.clients.producer.Producer< K , V > producer, boolean inTx)
doSend (org.apache.kafka.clients.producer.ProducerRecord< K , V > producerRecord, io.micrometer.observation.Observation observation)
Send the producer record.
Execute some arbitrary operation(s) on the producer and return the result.
Execute some arbitrary operation(s) on the operations and return the result.
Flush the producer.
The default topic for send methods where a topic is not provided.
Return the KafkaAdmin , used to find the cluster id for observation, if present.
Return the message converter.
Function <org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map < String , String >>
Return the Micrometer tags provider.
ProducerFactory < K , V >
Return the producer factory used by this template.
Return the producer factory used by this template based on the topic.
protected org.apache.kafka.clients.producer.Producer< K , V >
boolean
Return true if the template is currently running in a transaction on the calling thread.
boolean
Return true if this template, when transactional, allows non-transactional operations.
boolean
Return true if the implementation supports transactions (has a transaction-capable producer factory).
Map <org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric>
See Producer.metrics() .
List <org.apache.kafka.common.PartitionInfo>
See Producer.partitionsFor(String) .
org.apache.kafka.clients.consumer.ConsumerRecord< K , V >
receive ( String topic, int partition, long offset, Duration pollTimeout)
Receive a single record.
org.apache.kafka.clients.consumer.ConsumerRecords< K , V >
Receive multiple records.
send ( String topic, Integer partition, Long timestamp, K key, V data)
Send the data to the provided topic with the provided key and partition.
send ( String topic, Integer partition, K key, V data)
Send the data to the provided topic with the provided key and partition.
send ( String topic, K key, V data)
Send the data to the provided topic with the provided key and no partition.
send ( String topic, V data)
Send the data to the provided topic with no key or partition.
send (org.apache.kafka.clients.producer.ProducerRecord< K , V > record)
Send the provided ProducerRecord .
send ( Message <?> message)
Send a message with routing information in message headers.
CompletableFuture < SendResult < K , V >>
sendDefault ( Integer partition, Long timestamp, K key, V data)
Send the data to the default topic with the provided key and partition.
sendDefault ( Integer partition, K key, V data)
Send the data to the default topic with the provided key and partition.
Send the data to the default topic with the provided key and no partition.
Send the data to the default topic with no key or partition.
sendOffsetsToTransaction ( Map <org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
When running in a transaction, send the consumer offset(s) to the transaction.
setAllowNonTransactional (boolean allowNonTransactional)
Set to true to allow a non-transactional send when the template is transactional.
Set the maximum time to wait when closing a producer; default 5 seconds.
Set a consumer factory for receive operations.
Set the default topic for send methods where a topic is not provided.
Set the KafkaAdmin , used to find the cluster id for observation, if present.
Set the message converter to use.
setMicrometerEnabled (boolean micrometerEnabled)
Set to false to disable micrometer timers, if micrometer is on the class path.
Set additional tags for the Micrometer listener timers.
setMicrometerTagsProvider ( Function <org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map < String , String >> micrometerTagsProvider)
Set a function to provide dynamic tags based on the producer record.
setObservationEnabled (boolean observationEnabled)
Set to true to enable observation via Micrometer.
setProducerInterceptor (org.apache.kafka.clients.producer.ProducerInterceptor< K , V > producerInterceptor)
Set a producer interceptor on this template.
Set a ProducerListener which will be invoked when Kafka acknowledges a send operation.
setTransactionIdPrefix ( String transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.

Methods inherited from class java.lang. Object

clone , equals , finalize , getClass , hashCode , notify , notifyAll , toString , wait , wait , wait

Methods inherited from interface org.springframework.kafka.core. KafkaOperations

receive , receive

KafkaTemplate

public KafkaTemplate ( ProducerFactory < K , V > producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
Parameters:
producerFactory - the producer factory.
public KafkaTemplate ( ProducerFactory < K , V > producerFactory, @Nullable Map < String , Object > configOverrides)
Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a new DefaultKafkaProducerFactory will be created with merged producer properties with the overrides being applied after the supplied factory's properties.
Parameters:
producerFactory - the producer factory.
configOverrides - producer configuration properties to override.
Since:

KafkaTemplate

public KafkaTemplate ( ProducerFactory < K , V > producerFactory, boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting. Set autoFlush to true if you wish for the send operations on this template to occur immediately, regardless of the linger.ms or batch.size property values. This will also block until the broker has acknowledged receipt according to the producer's acks property.

Parameters:
producerFactory - the producer factory.
autoFlush - true to flush after each send.
See Also:
  • Producer.flush()
  • KafkaTemplate

    public KafkaTemplate ( ProducerFactory < K , V > producerFactory, boolean autoFlush, @Nullable Map < String , Object > configOverrides)
    Create an instance using the supplied producer factory and autoFlush setting. Set autoFlush to true if you wish for the send operations on this template to occur immediately, regardless of the linger.ms or batch.size property values. This will also block until the broker has acknowledged receipt according to the producer's acks property. If the configOverrides is not null or empty, a new ProducerFactory will be created using ProducerFactory.copyWithConfigurationOverride(java.util.Map) The factory shall apply the overrides after the supplied factory's properties. The ProducerPostProcessor s from the original factory are copied over to keep instrumentation alive. Registered ProducerFactory.Listener s are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.

    Parameters:
    producerFactory - the producer factory.
    autoFlush - true to flush after each send.
    configOverrides - producer configuration properties to override.
    Since:
    See Also:
  • Producer.flush()
  • setApplicationContext

    public void setApplicationContext ( ApplicationContext applicationContext)
    Specified by:
    setApplicationContext in interface ApplicationContextAware

    getDefaultTopic

    public String getDefaultTopic ()
    The default topic for send methods where a topic is not provided.
    Returns:
    the topic.

    setDefaultTopic

    public void setDefaultTopic ( String defaultTopic)
    Set the default topic for send methods where a topic is not provided.
    Parameters:
    defaultTopic - the topic.

    setProducerListener

    public void setProducerListener ( @Nullable ProducerListener < K , V > producerListener)
    Set a ProducerListener which will be invoked when Kafka acknowledges a send operation. By default a LoggingProducerListener is configured which logs errors only.
    Parameters:
    producerListener - the listener; may be null .

    getMessageConverter

    public RecordMessageConverter getMessageConverter ()
    Return the message converter.
    Returns:
    the message converter.

    setMessageConverter

    public void setMessageConverter ( RecordMessageConverter messageConverter)
    Set the message converter to use.
    Parameters:
    messageConverter - the message converter.

    setMessagingConverter

    public void setMessagingConverter ( SmartMessageConverter messageConverter)
    Set the SmartMessageConverter to use with the default MessagingMessageConverter . Not allowed when a custom messageConverter is provided.
    Parameters:
    messageConverter - the converter.
    Since:
    2.7.1

    isTransactional

    public boolean isTransactional ()
    Description copied from interface: KafkaOperations
    Return true if the implementation supports transactions (has a transaction-capable producer factory).
    Specified by:
    isTransactional in interface KafkaOperations < K , V >
    Returns:
    true or false.

    setTransactionIdPrefix

    public void setTransactionIdPrefix ( String transactionIdPrefix)
    Set a transaction id prefix to override the prefix in the producer factory.
    Parameters:
    transactionIdPrefix - the prefix.
    Since:

    setCloseTimeout

    public void setCloseTimeout ( Duration closeTimeout)
    Set the maximum time to wait when closing a producer; default 5 seconds.
    Parameters:
    closeTimeout - the close timeout.
    Since:
    2.1.14

    setAllowNonTransactional

    public void setAllowNonTransactional (boolean allowNonTransactional)
    Set to true to allow a non-transactional send when the template is transactional.
    Parameters:
    allowNonTransactional - true to allow.
    Since:
    2.4.3

    isAllowNonTransactional

    public boolean isAllowNonTransactional ()
    Description copied from interface: KafkaOperations
    Return true if this template, when transactional, allows non-transactional operations.
    Specified by:
    isAllowNonTransactional in interface KafkaOperations < K , V >
    Returns:
    true to allow.

    setMicrometerEnabled

    public void setMicrometerEnabled (boolean micrometerEnabled)
    Set to false to disable micrometer timers, if micrometer is on the class path.
    Parameters:
    micrometerEnabled - false to disable.
    Since:

    setMicrometerTags

    public void setMicrometerTags ( Map < String , String > tags)
    Set additional tags for the Micrometer listener timers.
    Parameters:
    tags - the tags.
    Since:

    setMicrometerTagsProvider

    public void setMicrometerTagsProvider ( @Nullable Function <org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map < String , String >> micrometerTagsProvider)
    Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided in micrometerTags . Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.
    Parameters:
    micrometerTagsProvider - the micrometerTagsProvider.
    Since:
    2.9.8
    See Also:
  • setMicrometerEnabled(boolean)
  • setMicrometerTags(Map)
  • setObservationEnabled(boolean)
  • getMicrometerTagsProvider

    @Nullable public Function <org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map < String , String >> getMicrometerTagsProvider ()
    Return the Micrometer tags provider.
    Returns:
    the micrometerTagsProvider.
    Since:
    2.9.8

    getProducerFactory

    public ProducerFactory < K , V > getProducerFactory ()
    Return the producer factory used by this template.
    Specified by:
    getProducerFactory in interface KafkaOperations < K , V >
    Returns:
    the factory.
    Since:
    2.2.5

    getProducerFactory

    protected ProducerFactory < K , V > getProducerFactory ( String topic)
    Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.
    Parameters:
    topic - the topic.
    Returns:
    the factory.
    Since:

    setConsumerFactory

    public void setConsumerFactory ( ConsumerFactory < K , V > consumerFactory)
    Set a consumer factory for receive operations.
    Parameters:
    consumerFactory - the consumer factory.
    Since:

    setProducerInterceptor

    public void setProducerInterceptor (org.apache.kafka.clients.producer.ProducerInterceptor< K , V > producerInterceptor)
    Set a producer interceptor on this template.
    Parameters:
    producerInterceptor - the producer interceptor
    Since:

    setObservationEnabled

    public void setObservationEnabled (boolean observationEnabled)
    Set to true to enable observation via Micrometer.
    Parameters:
    observationEnabled - true to enable.
    Since:
    See Also:
  • setMicrometerEnabled(boolean)
  • setObservationConvention

    public void setObservationConvention ( KafkaTemplateObservationConvention observationConvention)
    Parameters:
    observationConvention - the convention.
    Since:
    @Nullable public KafkaAdmin getKafkaAdmin ()
    Return the KafkaAdmin , used to find the cluster id for observation, if present.
    Returns:
    the kafkaAdmin
    Since:
    3.0.5

    setKafkaAdmin

    public void setKafkaAdmin ( KafkaAdmin kafkaAdmin)
    Set the KafkaAdmin , used to find the cluster id for observation, if present.
    Parameters:
    kafkaAdmin - the admin.
    Description copied from interface: KafkaOperations
    Send the data to the default topic with no key or partition.
    Specified by:
    sendDefault in interface KafkaOperations < K , V >
    Parameters:
    data - The data.
    Returns:
    a Future for the SendResult .
    V data)
    Description copied from interface: KafkaOperations
    Send the data to the default topic with the provided key and no partition.
    Specified by:
    sendDefault in interface KafkaOperations < K , V >
    Parameters:
    key - the key.
    data - The data.
    Returns:
    a Future for the SendResult .

    sendDefault

    public CompletableFuture < SendResult < K , V >> sendDefault ( Integer partition, K key, @Nullable V data)
    Description copied from interface: KafkaOperations
    Send the data to the default topic with the provided key and partition.
    Specified by:
    sendDefault in interface KafkaOperations < K , V >
    Parameters:
    partition - the partition.
    key - the key.
    data - the data.
    Returns:
    a Future for the SendResult .

    sendDefault

    public CompletableFuture < SendResult < K , V >> sendDefault ( Integer partition, Long timestamp, K key, @Nullable V data)
    Description copied from interface: KafkaOperations
    Send the data to the default topic with the provided key and partition.
    Specified by:
    sendDefault in interface KafkaOperations < K , V >
    Parameters:
    partition - the partition.
    timestamp - the timestamp of the record.
    key - the key.
    data - the data.
    Returns:
    a Future for the SendResult .
    V data)
    Description copied from interface: KafkaOperations
    Send the data to the provided topic with no key or partition.
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    data - The data.
    Returns:
    a Future for the SendResult .
    V data)
    Description copied from interface: KafkaOperations
    Send the data to the provided topic with the provided key and no partition.
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    key - the key.
    data - The data.
    Returns:
    a Future for the SendResult .
    Description copied from interface: KafkaOperations
    Send the data to the provided topic with the provided key and partition.
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    partition - the partition.
    key - the key.
    data - the data.
    Returns:
    a Future for the SendResult .
    public CompletableFuture < SendResult < K , V >> send ( String topic, Integer partition, Long timestamp, K key, @Nullable V data)
    Description copied from interface: KafkaOperations
    Send the data to the provided topic with the provided key and partition.
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    partition - the partition.
    timestamp - the timestamp of the record.
    key - the key.
    data - the data.
    Returns:
    a Future for the SendResult .
    public CompletableFuture < SendResult < K , V >> send (org.apache.kafka.clients.producer.ProducerRecord< K , V > record)
    Description copied from interface: KafkaOperations
    Send the provided ProducerRecord .
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    record - the record.
    Returns:
    a Future for the SendResult .
    public CompletableFuture < SendResult < K , V >> send ( Message <?> message)
    Description copied from interface: KafkaOperations
    Send a message with routing information in message headers. The message payload may be converted before sending.
    Specified by:
    send in interface KafkaOperations < K , V >
    Parameters:
    message - the message to send.
    Returns:
    a Future for the SendResult .
    See Also:
  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • partitionsFor

    public List <org.apache.kafka.common.PartitionInfo> partitionsFor ( String topic)
    Description copied from interface: KafkaOperations
    See Producer.partitionsFor(String) .
    Specified by:
    partitionsFor in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    Returns:
    the partition info.

    metrics

    public Map <org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> metrics ()
    Description copied from interface: KafkaOperations
    See Producer.metrics() .
    Specified by:
    metrics in interface KafkaOperations < K , V >
    Returns:
    the metrics.

    execute

    public <T> T execute ( KafkaOperations.ProducerCallback < K , V , T> callback)
    Description copied from interface: KafkaOperations
    Execute some arbitrary operation(s) on the producer and return the result.
    Specified by:
    execute in interface KafkaOperations < K , V >
    Type Parameters:
    T - the result type.
    Parameters:
    callback - the callback.
    Returns:
    the result.

    executeInTransaction

    public <T> T executeInTransaction ( KafkaOperations.OperationsCallback < K , V , T> callback)
    Description copied from interface: KafkaOperations
    Execute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).
    Specified by:
    executeInTransaction in interface KafkaOperations < K , V >
    Type Parameters:
    T - the result type.
    Parameters:
    callback - the callback.
    Returns:
    the result.
    public void flush ()
    Flush the producer.

    Note It only makes sense to invoke this method if the ProducerFactory serves up a singleton producer (such as the DefaultKafkaProducerFactory ).

    Specified by:
    flush in interface KafkaOperations < K , V >

    sendOffsetsToTransaction

    public void sendOffsetsToTransaction ( Map <org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
    Description copied from interface: KafkaOperations
    When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with a KafkaAwareTransactionManager ) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.
    Specified by:
    sendOffsetsToTransaction in interface KafkaOperations < K , V >
    Parameters:
    offsets - The offsets.
    groupMetadata - the consumer group metadata.
    See Also:
  • Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
  • receive

    @Nullable public org.apache.kafka.clients.consumer.ConsumerRecord< K , V > receive ( String topic, int partition, long offset, Duration pollTimeout)
    Description copied from interface: KafkaOperations
    Receive a single record.
    Specified by:
    receive in interface KafkaOperations < K , V >
    Parameters:
    topic - the topic.
    partition - the partition.
    offset - the offset.
    pollTimeout - the timeout.
    Returns:
    the record or null.

    receive

    public org.apache.kafka.clients.consumer.ConsumerRecords< K , V > receive ( Collection < TopicPartitionOffset > requested, Duration pollTimeout)
    Description copied from interface: KafkaOperations
    Receive multiple records. Only absolute, positive offsets are supported.
    Specified by:
    receive in interface KafkaOperations < K , V >
    Parameters:
    requested - a collection of record requests (topic/partition/offset).
    pollTimeout - the timeout.
    Returns:
    the record or null.

    closeProducer

    protected void closeProducer (org.apache.kafka.clients.producer.Producer< K , V > producer, boolean inTx)

    doSend

    protected CompletableFuture < SendResult < K , V >> doSend (org.apache.kafka.clients.producer.ProducerRecord< K , V > producerRecord, io.micrometer.observation.Observation observation)
    Send the producer record.
    Parameters:
    producerRecord - the producer record.
    observation - the observation.
    Returns:
    a Future for the RecordMetadata .

    inTransaction

    public boolean inTransaction ()
    Return true if the template is currently running in a transaction on the calling thread.
    Specified by:
    inTransaction in interface KafkaOperations < K , V >
    Returns:
    true if a transaction is running.
    Since:
    2.2.1

    getTheProducer

    protected org.apache.kafka.clients.producer.Producer< K , V > getTheProducer ( @Nullable String topic)