All Implemented Interfaces:
EventListener
,
Aware
,
BeanNameAware
,
DisposableBean
,
SmartInitializingSingleton
,
ApplicationContextAware
,
ApplicationListener
<
ContextStoppedEvent
>
,
KafkaOperations
<K,
V>
Direct Known Subclasses:
ReplyingKafkaTemplate
,
RoutingKafkaTemplate
A template for executing high-level operations. When used with a
DefaultKafkaProducerFactory
, the template is thread-safe. The producer factory
and
KafkaProducer
ensure this; refer to their
respective javadocs.
Author:
Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.core.
KafkaOperations
KafkaOperations.OperationsCallback
<
K
,
V
,
T
>,
KafkaOperations.ProducerCallback
<
K
,
V
,
T
>
Create an instance using the supplied producer factory and autoFlush false.
Create an instance using the supplied producer factory and autoFlush setting.
Create an instance using the supplied producer factory and autoFlush setting.
Create an instance using the supplied producer factory and properties, with
autoFlush false.
protected void
closeProducer
(org.apache.kafka.clients.producer.Producer<
K
,
V
> producer,
boolean inTx)
doSend
(org.apache.kafka.clients.producer.ProducerRecord<
K
,
V
> producerRecord,
io.micrometer.observation.Observation observation)
Send the producer record.
Execute some arbitrary operation(s) on the producer and return the result.
Execute some arbitrary operation(s) on the operations and return the result.
Flush the producer.
The default topic for send methods where a topic is not
provided.
Return the
KafkaAdmin
, used to find the cluster id for observation, if
present.
Return the message converter.
Return the Micrometer tags provider.
Return the producer factory used by this template.
Return the producer factory used by this template based on the topic.
protected org.apache.kafka.clients.producer.Producer<
K
,
V
>
boolean
Return true if the template is currently running in a transaction on the calling
thread.
boolean
Return true if this template, when transactional, allows non-transactional operations.
boolean
Return true if the implementation supports transactions (has a transaction-capable
producer factory).
Map
<org.apache.kafka.common.MetricName,
? extends org.apache.kafka.common.Metric>
See
Producer.metrics()
.
List
<org.apache.kafka.common.PartitionInfo>
See
Producer.partitionsFor(String)
.
org.apache.kafka.clients.consumer.ConsumerRecord<
K
,
V
>
Receive a single record.
org.apache.kafka.clients.consumer.ConsumerRecords<
K
,
V
>
Receive multiple records.
Send the data to the provided topic with the provided key and partition.
Send the data to the provided topic with the provided key and partition.
Send the data to the provided topic with the provided key and no partition.
Send the data to the provided topic with no key or partition.
send
(org.apache.kafka.clients.producer.ProducerRecord<
K
,
V
> record)
Send the provided
ProducerRecord
.
Send a message with routing information in message headers.
Send the data to the default topic with the provided key and partition.
Send the data to the default topic with the provided key and partition.
Send the data to the default topic with the provided key and no partition.
Send the data to the default topic with no key or partition.
sendOffsetsToTransaction
(
Map
<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
When running in a transaction, send the consumer offset(s) to the transaction.
Set to true to allow a non-transactional send when the template is transactional.
Set the maximum time to wait when closing a producer; default 5 seconds.
Set a consumer factory for receive operations.
Set the default topic for send methods where a topic is not
provided.
Set the
KafkaAdmin
, used to find the cluster id for observation, if
present.
Set the message converter to use.
Set to false to disable micrometer timers, if micrometer is on the class path.
Set additional tags for the Micrometer listener timers.
Set a function to provide dynamic tags based on the producer record.
Set to true to enable observation via Micrometer.
Set a producer interceptor on this template.
Set a
ProducerListener
which will be invoked when Kafka acknowledges
a send operation.
Set a transaction id prefix to override the prefix in the producer factory.
Methods inherited from class java.lang.
Object
clone
,
equals
,
finalize
,
getClass
,
hashCode
,
notify
,
notifyAll
,
toString
,
wait
,
wait
,
wait
Methods inherited from interface org.springframework.kafka.core.
KafkaOperations
receive
,
receive
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush false.
Parameters:
producerFactory
- the producer factory.
Create an instance using the supplied producer factory and properties, with
autoFlush false. If the configOverrides is not null or empty, a new
DefaultKafkaProducerFactory
will be created with merged producer properties
with the overrides being applied after the supplied factory's properties.
Parameters:
producerFactory
- the producer factory.
configOverrides
- producer configuration properties to override.
Since:
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush setting.
Set autoFlush to
true
if you wish for the send operations on this template
to occur immediately, regardless of the
linger.ms
or
batch.size
property values. This will also block until the broker has acknowledged receipt
according to the producer's
acks
property.
Parameters:
producerFactory
- the producer factory.
autoFlush
- true to flush after each send.
See Also:
Producer.flush()
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush setting.
Set autoFlush to
true
if you wish for the send operations on this template
to occur immediately, regardless of the
linger.ms
or
batch.size
property values. This will also block until the broker has acknowledged receipt
according to the producer's
acks
property. If the configOverrides is not
null or empty, a new
ProducerFactory
will be created using
ProducerFactory.copyWithConfigurationOverride(java.util.Map)
The factory shall apply the overrides after the supplied factory's properties. The
ProducerPostProcessor
s from the original
factory are copied over to keep instrumentation alive. Registered
ProducerFactory.Listener
s are also added to
the new factory. If the factory implementation does not support the copy operation,
a generic copy of the ProducerFactory is created which will be of type
DefaultKafkaProducerFactory.
Parameters:
producerFactory
- the producer factory.
autoFlush
- true to flush after each send.
configOverrides
- producer configuration properties to override.
Since:
See Also:
Producer.flush()
setApplicationContext
Specified by:
setApplicationContext
in interface
ApplicationContextAware
getDefaultTopic
public
String
getDefaultTopic
()
The default topic for send methods where a topic is not
provided.
Returns:
the topic.
setDefaultTopic
public
void
setDefaultTopic
(
String
defaultTopic)
Set the default topic for send methods where a topic is not
provided.
Parameters:
defaultTopic
- the topic.
setProducerListener
Parameters:
producerListener
- the listener; may be
null
.
getMessageConverter
Return the message converter.
Returns:
the message converter.
setMessageConverter
Set the message converter to use.
Parameters:
messageConverter
- the message converter.
setMessagingConverter
Parameters:
messageConverter
- the converter.
Since:
2.7.1
isTransactional
public
boolean
isTransactional
()
Return true if the implementation supports transactions (has a transaction-capable
producer factory).
Specified by:
isTransactional
in interface
KafkaOperations
<
K
,
V
>
Returns:
true or false.
setTransactionIdPrefix
public
void
setTransactionIdPrefix
(
String
transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.
Parameters:
transactionIdPrefix
- the prefix.
Since:
setCloseTimeout
public
void
setCloseTimeout
(
Duration
closeTimeout)
Set the maximum time to wait when closing a producer; default 5 seconds.
Parameters:
closeTimeout
- the close timeout.
Since:
2.1.14
setAllowNonTransactional
public
void
setAllowNonTransactional
(boolean allowNonTransactional)
Set to true to allow a non-transactional send when the template is transactional.
Parameters:
allowNonTransactional
- true to allow.
Since:
2.4.3
isAllowNonTransactional
public
boolean
isAllowNonTransactional
()
Return true if this template, when transactional, allows non-transactional operations.
Specified by:
isAllowNonTransactional
in interface
KafkaOperations
<
K
,
V
>
Returns:
true to allow.
setMicrometerEnabled
public
void
setMicrometerEnabled
(boolean micrometerEnabled)
Set to false to disable micrometer timers, if micrometer is on the class path.
Parameters:
micrometerEnabled
- false to disable.
Since:
setMicrometerTags
Set additional tags for the Micrometer listener timers.
Parameters:
tags
- the tags.
Since:
setMicrometerTagsProvider
public
void
setMicrometerTagsProvider
(
@Nullable
Function
<org.apache.kafka.clients.producer.ProducerRecord<?,
?>,
Map
<
String
,
String
>> micrometerTagsProvider)
Set a function to provide dynamic tags based on the producer record. These tags
will be added to any static tags provided in
micrometerTags
. Only applies to record listeners, ignored for batch listeners.
Does not apply if observation is enabled.
Parameters:
micrometerTagsProvider
- the micrometerTagsProvider.
Since:
2.9.8
See Also:
setMicrometerEnabled(boolean)
setMicrometerTags(Map)
setObservationEnabled(boolean)
getMicrometerTagsProvider
Return the Micrometer tags provider.
Returns:
the micrometerTagsProvider.
Since:
2.9.8
getProducerFactory
Return the producer factory used by this template.
Specified by:
getProducerFactory
in interface
KafkaOperations
<
K
,
V
>
Returns:
the factory.
Since:
2.2.5
getProducerFactory
Return the producer factory used by this template based on the topic.
The default implementation returns the only producer factory.
Parameters:
topic
- the topic.
Returns:
the factory.
Since:
setConsumerFactory
Set a consumer factory for receive operations.
Parameters:
consumerFactory
- the consumer factory.
Since:
setProducerInterceptor
public
void
setProducerInterceptor
(org.apache.kafka.clients.producer.ProducerInterceptor<
K
,
V
> producerInterceptor)
Set a producer interceptor on this template.
Parameters:
producerInterceptor
- the producer interceptor
Since:
setObservationEnabled
public
void
setObservationEnabled
(boolean observationEnabled)
Set to true to enable observation via Micrometer.
Parameters:
observationEnabled
- true to enable.
Since:
See Also:
setMicrometerEnabled(boolean)
setObservationConvention
Parameters:
observationConvention
- the convention.
Since:
Return the
KafkaAdmin
, used to find the cluster id for observation, if
present.
Returns:
the kafkaAdmin
Since:
3.0.5
setKafkaAdmin
public
void
setKafkaAdmin
(
KafkaAdmin
kafkaAdmin)
Set the
KafkaAdmin
, used to find the cluster id for observation, if
present.
Parameters:
kafkaAdmin
- the admin.
Send the data to the default topic with no key or partition.
Specified by:
sendDefault
in interface
KafkaOperations
<
K
,
V
>
Parameters:
data
- The data.
Returns:
a Future for the
SendResult
.
V
data)
Send the data to the default topic with the provided key and no partition.
Specified by:
sendDefault
in interface
KafkaOperations
<
K
,
V
>
Parameters:
key
- the key.
data
- The data.
Returns:
a Future for the
SendResult
.
sendDefault
Send the data to the default topic with the provided key and partition.
Specified by:
sendDefault
in interface
KafkaOperations
<
K
,
V
>
Parameters:
partition
- the partition.
key
- the key.
data
- the data.
Returns:
a Future for the
SendResult
.
sendDefault
Send the data to the default topic with the provided key and partition.
Specified by:
sendDefault
in interface
KafkaOperations
<
K
,
V
>
Parameters:
partition
- the partition.
timestamp
- the timestamp of the record.
key
- the key.
data
- the data.
Returns:
a Future for the
SendResult
.
V
data)
Send the data to the provided topic with no key or partition.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
data
- The data.
Returns:
a Future for the
SendResult
.
V
data)
Send the data to the provided topic with the provided key and no partition.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
key
- the key.
data
- The data.
Returns:
a Future for the
SendResult
.
Send the data to the provided topic with the provided key and partition.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
partition
- the partition.
key
- the key.
data
- the data.
Returns:
a Future for the
SendResult
.
Send the data to the provided topic with the provided key and partition.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
partition
- the partition.
timestamp
- the timestamp of the record.
key
- the key.
data
- the data.
Returns:
a Future for the
SendResult
.
Send the provided
ProducerRecord
.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
record
- the record.
Returns:
a Future for the
SendResult
.
Send a message with routing information in message headers. The message payload
may be converted before sending.
Specified by:
send
in interface
KafkaOperations
<
K
,
V
>
Parameters:
message
- the message to send.
Returns:
a Future for the
SendResult
.
See Also:
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION
KafkaHeaders.KEY
partitionsFor
public
List
<org.apache.kafka.common.PartitionInfo>
partitionsFor
(
String
topic)
See
Producer.partitionsFor(String)
.
Specified by:
partitionsFor
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
Returns:
the partition info.
metrics
public
Map
<org.apache.kafka.common.MetricName,
? extends org.apache.kafka.common.Metric>
metrics
()
See
Producer.metrics()
.
Specified by:
metrics
in interface
KafkaOperations
<
K
,
V
>
Returns:
the metrics.
execute
Execute some arbitrary operation(s) on the producer and return the result.
Specified by:
execute
in interface
KafkaOperations
<
K
,
V
>
Type Parameters:
T
- the result type.
Parameters:
callback
- the callback.
Returns:
the result.
executeInTransaction
Execute some arbitrary operation(s) on the operations and return the result.
The operations are invoked within a local transaction and do not participate
in a global transaction (if present).
Specified by:
executeInTransaction
in interface
KafkaOperations
<
K
,
V
>
Type Parameters:
T
- the result type.
Parameters:
callback
- the callback.
Returns:
the result.
public
void
flush
()
Specified by:
flush
in interface
KafkaOperations
<
K
,
V
>
sendOffsetsToTransaction
public
void
sendOffsetsToTransaction
(
Map
<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
When running in a transaction, send the consumer offset(s) to the transaction. It
is not necessary to call this method if the operations are invoked on a listener
container thread (and the listener container is configured with a
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.
Use with 2.5 brokers or later.
Specified by:
sendOffsetsToTransaction
in interface
KafkaOperations
<
K
,
V
>
Parameters:
offsets
- The offsets.
groupMetadata
- the consumer group metadata.
See Also:
Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
receive
@Nullable
public
org.apache.kafka.clients.consumer.ConsumerRecord<
K
,
V
>
receive
(
String
topic,
int partition,
long offset,
Duration
pollTimeout)
Receive a single record.
Specified by:
receive
in interface
KafkaOperations
<
K
,
V
>
Parameters:
topic
- the topic.
partition
- the partition.
offset
- the offset.
pollTimeout
- the timeout.
Returns:
the record or null.
receive
Receive multiple records. Only absolute, positive offsets are supported.
Specified by:
receive
in interface
KafkaOperations
<
K
,
V
>
Parameters:
requested
- a collection of record requests (topic/partition/offset).
pollTimeout
- the timeout.
Returns:
the record or null.
closeProducer
protected
void
closeProducer
(org.apache.kafka.clients.producer.Producer<
K
,
V
> producer,
boolean inTx)
doSend
protected
CompletableFuture
<
SendResult
<
K
,
V
>>
doSend
(org.apache.kafka.clients.producer.ProducerRecord<
K
,
V
> producerRecord,
io.micrometer.observation.Observation observation)
Send the producer record.
Parameters:
producerRecord
- the producer record.
observation
- the observation.
Returns:
a Future for the
RecordMetadata
.
inTransaction
public
boolean
inTransaction
()
Return true if the template is currently running in a transaction on the calling
thread.
Specified by:
inTransaction
in interface
KafkaOperations
<
K
,
V
>
Returns:
true if a transaction is running.
Since:
2.2.1
getTheProducer
protected
org.apache.kafka.clients.producer.Producer<
K
,
V
>
getTheProducer
(
@Nullable
String
topic)