@Repeatable
(
KafkaListeners.class
)
public @interface
KafkaListener
Annotation that marks a method to be the target of a Kafka message listener on the
specified topics.
The
containerFactory()
identifies the
KafkaListenerContainerFactory
to use to build the Kafka listener container. If not
set, a
default
container factory is assumed to be available with a bean name
of
kafkaListenerContainerFactory
unless an explicit default has been provided
through configuration.
Processing of
@KafkaListener
annotations is performed by registering a
KafkaListenerAnnotationBeanPostProcessor
. This can be done manually or, more
conveniently, through
EnableKafka
annotation.
Annotated methods are allowed to have flexible signatures similar to what
MessageMapping
provides, that is
ConsumerRecord
to access to the raw Kafka
message
Acknowledgment
to manually ack
@Payload
-annotated
method arguments including the support of validation
@Header
-annotated
method arguments to extract a specific header value, defined by
KafkaHeaders
@Headers
-annotated
argument that must also be assignable to
Map
for getting access to
all headers.
MessageHeaders
arguments for
getting access to all headers.
MessageHeaderAccessor
for convenient access to all method arguments.
When defined at the method level, a listener container is created for each method.
The
MessageListener
is a
MessagingMessageListenerAdapter
,
configured with a
MethodKafkaListenerEndpoint
.
When defined at the class level, a single message listener container is used to
service all methods annotated with
@KafkaHandler
. Method signatures of such
annotated methods must not cause any ambiguity such that a single method can be
resolved for a particular inbound message. The
MessagingMessageListenerAdapter
is
configured with a
MultiMethodKafkaListenerEndpoint
.
Author:
Gary Russell, Venil Noronha
See Also:
EnableKafka
KafkaListenerAnnotationBeanPostProcessor
KafkaListeners
Set to true or false, to override the default setting in the container factory.
Override the container factory's
batchListener
property.
A pseudo bean name used in SpEL expressions within this annotation to reference
the current bean within which this listener is defined.
When provided, overrides the client id property in the consumer factory
configuration.
Override the container factory's
concurrency
setting for this listener.
If provided, the listener container for this listener will be added to a bean with
this value as its name, of type
Collection<MessageListenerContainer>
.
Override the
group.id
property for the consumer factory with this value
for this listener only.
The unique identifier of the container for this listener.
boolean
When
groupId
is not provided, use the
id
(if
provided) as the
group.id
property for the consumer.
Kafka consumer properties; they will supersede any properties with the same name
defined in the consumer factory (if the consumer factory supports property overrides).
boolean
When false and the return type is an
Iterable
return the result as the
value of a single reply record instead of individual records for each element.
The topicPartitions for this listener when using manual topic/partition
assignment.
The topic pattern for this listener.
The topics for this listener.
The unique identifier of the container for this listener.
If none is specified an auto-generated id is used.
Note: When provided, this value will override the group id property
in the consumer factory configuration, unless
idIsGroup()
is set to false or
groupId()
is provided.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
the
id
for the container managing for this endpoint.
See Also:
KafkaListenerEndpointRegistry.getListenerContainer(String)
containerFactory
The bean name of the
KafkaListenerContainerFactory
to use to create the message listener container responsible to serve this endpoint.
If not specified, the default container factory is used, if any. If a SpEL
expression is provided (
#{...}
), the expression can either evaluate to a
container factory instance or a bean name.
Returns:
the container factory bean name.
Default:
The topics for this listener.
The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
An expression must be resolved to the topic name.
This uses group management and Kafka will assign partitions to group members.
Mutually exclusive with
topicPattern()
and
topicPartitions()
.
Returns:
the topic names or expressions (SpEL) to listen to.
Default:
topicPattern
The topic pattern for this listener. The entries can be 'topic pattern', a
'property-placeholder key' or an 'expression'. The framework will create a
container that subscribes to all topics matching the specified pattern to get
dynamically assigned partitions. The pattern matching will be performed
periodically against topics existing at the time of check. An expression must
be resolved to the topic pattern (String or Pattern result types are supported).
This uses group management and Kafka will assign partitions to group members.
Mutually exclusive with
topics()
and
topicPartitions()
.
Returns:
the topic pattern or expression (SpEL).
See Also:
CommonClientConfigs.METADATA_MAX_AGE_CONFIG
topicPartitions
The topicPartitions for this listener when using manual topic/partition
assignment.
Mutually exclusive with
topicPattern()
and
topics()
.
Returns:
the topic names or expressions (SpEL) to listen to.
Default:
containerGroup
If provided, the listener container for this listener will be added to a bean with
this value as its name, of type
Collection<MessageListenerContainer>
. This
allows, for example, iteration over the collection to start/stop a subset of
containers. The
Collection
beans are deprecated as of version 2.7.3 and
will be removed in 2.8. Instead, a bean with name
containerGroup + ".group"
and type
ContainerGroup
should be used
instead.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
the bean name for the group.
Default:
Returns:
the error handler.
Since:
Default:
groupId
Override the
group.id
property for the consumer factory with this value
for this listener only.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
the group id.
Since:
Default:
boolean
idIsGroup
When
groupId
is not provided, use the
id
(if
provided) as the
group.id
property for the consumer. Set to false, to use
the
group.id
from the consumer factory.
Returns:
false to disable.
Since:
Default:
clientIdPrefix
When provided, overrides the client id property in the consumer factory
configuration. A suffix ('-n') is added for each container instance to ensure
uniqueness when concurrency is used.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
the client id prefix.
Since:
2.1.1
Default:
beanRef
A pseudo bean name used in SpEL expressions within this annotation to reference
the current bean within which this listener is defined. This allows access to
properties and methods within the enclosing bean.
Default '__listener'.
Example:
topics = "#{__listener.topicList}"
.
Returns:
the pseudo bean name.
Since:
2.1.2
Default:
"__listener"
concurrency
Override the container factory's
concurrency
setting for this listener. May
be a property placeholder or SpEL expression that evaluates to a
Number
, in
which case
Number.intValue()
is used to obtain the value.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
the concurrency.
Since:
Default:
autoStartup
Set to true or false, to override the default setting in the container factory. May
be a property placeholder or SpEL expression that evaluates to a
Boolean
or
a
String
, in which case the
Boolean.parseBoolean(String)
is used to
obtain the value.
SpEL
#{...}
and property place holders
${...}
are supported.
Returns:
true to auto start, false to not auto start.
Since:
Default:
properties
Kafka consumer properties; they will supersede any properties with the same name
defined in the consumer factory (if the consumer factory supports property overrides).
Supported Syntax
The supported syntax for key-value pairs is the same as the
syntax defined for entries in a Java
properties file
:
key=value
key:value
key value
group.id
and
client.id
are ignored.
SpEL
#{...}
and property place holders
${...}
are supported.
SpEL expressions must resolve to a
String
, a @{link String[]} or a
Collection<String>
where each member of the array or collection is a
property name + value with the above formats.
Returns:
the properties.
Since:
2.2.4
See Also:
ConsumerConfig
groupId()
clientIdPrefix()
splitIterables
boolean
splitIterables
When false and the return type is an
Iterable
return the result as the
value of a single reply record instead of individual records for each element.
Default true. Ignored if the reply is of type
Iterable<Message<?>>
.
Returns:
false to create a single reply record.
Since:
2.3.5
Default:
Returns:
the bean name.
Since:
2.7.1
Default:
Override the container factory's
batchListener
property. The listener
method signature should receive a
List<?>
; refer to the reference
documentation. This allows a single container factory to be used for both record
and batch listeners; previously separate container factories were required.
Returns:
"true" for the annotated method to be a batch listener or "false" for a
record listener. If not set, the container factory setting is used. SpEL and
property placeholders are not supported because the listener type cannot be
variable.
Since:
See Also:
Boolean.parseBoolean(String)
Set an
RecordFilterStrategy
bean
name to override the strategy configured on the container factory. If a SpEL
expression is provided (
#{...}
), the expression can either evaluate to a
RecordFilterStrategy
instance or
a bean name.
Returns:
the error handler.
Since:
2.8.4
Default:
Static information that will be added as a header with key
KafkaHeaders.LISTENER_INFO
. This can be
used, for example, in a
RecordInterceptor
,
RecordFilterStrategy
or the
listener itself, for any purposes.
SpEL
#{...}
and property place holders
${...}
are supported, but it
must resolve to a String or
byte[]
.
This header will be stripped out if an outbound record is created with the headers
from an input record.
Returns:
the info.
Since:
2.8.4
Default: