@Repeatable ( KafkaListeners.class ) public @interface KafkaListener
Annotation that marks a method to be the target of a Kafka message listener on the specified topics. The containerFactory() identifies the KafkaListenerContainerFactory to use to build the Kafka listener container. If not set, a default container factory is assumed to be available with a bean name of kafkaListenerContainerFactory unless an explicit default has been provided through configuration. Processing of @KafkaListener annotations is performed by registering a KafkaListenerAnnotationBeanPostProcessor . This can be done manually or, more conveniently, through EnableKafka annotation. Annotated methods are allowed to have flexible signatures similar to what MessageMapping provides, that is
  • ConsumerRecord to access to the raw Kafka message
  • Acknowledgment to manually ack
  • @Payload -annotated method arguments including the support of validation
  • @Header -annotated method arguments to extract a specific header value, defined by KafkaHeaders
  • @Headers -annotated argument that must also be assignable to Map for getting access to all headers.
  • MessageHeaders arguments for getting access to all headers.
  • MessageHeaderAccessor for convenient access to all method arguments.
  • When defined at the method level, a listener container is created for each method. The MessageListener is a MessagingMessageListenerAdapter , configured with a MethodKafkaListenerEndpoint .

    When defined at the class level, a single message listener container is used to service all methods annotated with @KafkaHandler . Method signatures of such annotated methods must not cause any ambiguity such that a single method can be resolved for a particular inbound message. The MessagingMessageListenerAdapter is configured with a MultiMethodKafkaListenerEndpoint .

    Author:
    Gary Russell, Venil Noronha
    See Also:
  • EnableKafka
  • KafkaListenerAnnotationBeanPostProcessor
  • KafkaListeners
  • Set to true or false, to override the default setting in the container factory.
    Override the container factory's batchListener property.
    A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined.
    When provided, overrides the client id property in the consumer factory configuration.
    Override the container factory's concurrency setting for this listener.
    The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint.
    If provided, the listener container for this listener will be added to a bean with this value as its name, of type Collection<MessageListenerContainer> .
    Set the bean name of a SmartMessageConverter (such as the CompositeMessageConverter ) to use in conjunction with the MessageHeaders.CONTENT_TYPE header to perform the conversion to the required type.
    Set an KafkaListenerErrorHandler bean name to invoke if the listener method throws an exception.
    Set an RecordFilterStrategy bean name to override the strategy configured on the container factory.
    Override the group.id property for the consumer factory with this value for this listener only.
    The unique identifier of the container for this listener.
    boolean
    When groupId is not provided, use the id (if provided) as the group.id property for the consumer.
    Static information that will be added as a header with key KafkaHeaders.LISTENER_INFO .
    Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).
    boolean
    When false and the return type is an Iterable return the result as the value of a single reply record instead of individual records for each element.
    The topicPartitions for this listener when using manual topic/partition assignment.
    The topic pattern for this listener.
    The topics for this listener.
    The unique identifier of the container for this listener.

    If none is specified an auto-generated id is used.

    Note: When provided, this value will override the group id property in the consumer factory configuration, unless idIsGroup() is set to false or groupId() is provided.

    SpEL #{...} and property place holders ${...} are supported.

    Returns:
    the id for the container managing for this endpoint.
    See Also:
  • KafkaListenerEndpointRegistry.getListenerContainer(String)
  • containerFactory

    String containerFactory
    The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint. If not specified, the default container factory is used, if any. If a SpEL expression is provided ( #{...} ), the expression can either evaluate to a container factory instance or a bean name.

    Returns:
    the container factory bean name.
    Default:
    String [] topics
    The topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. This uses group management and Kafka will assign partitions to group members. Mutually exclusive with topicPattern() and topicPartitions() .

    Returns:
    the topic names or expressions (SpEL) to listen to.
    Default:

    topicPattern

    String topicPattern
    The topic pattern for this listener. The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check. An expression must be resolved to the topic pattern (String or Pattern result types are supported). This uses group management and Kafka will assign partitions to group members. Mutually exclusive with topics() and topicPartitions() .

    Returns:
    the topic pattern or expression (SpEL).
    See Also:
  • CommonClientConfigs.METADATA_MAX_AGE_CONFIG
  • topicPartitions

    TopicPartition [] topicPartitions
    The topicPartitions for this listener when using manual topic/partition assignment. Mutually exclusive with topicPattern() and topics() .

    Returns:
    the topic names or expressions (SpEL) to listen to.
    Default:

    containerGroup

    String containerGroup
    If provided, the listener container for this listener will be added to a bean with this value as its name, of type Collection<MessageListenerContainer> . This allows, for example, iteration over the collection to start/stop a subset of containers. The Collection beans are deprecated as of version 2.7.3 and will be removed in 2.8. Instead, a bean with name containerGroup + ".group" and type ContainerGroup should be used instead. SpEL #{...} and property place holders ${...} are supported.

    Returns:
    the bean name for the group.
    Default:
    String errorHandler
    Set an KafkaListenerErrorHandler bean name to invoke if the listener method throws an exception. If a SpEL expression is provided ( #{...} ), the expression can either evaluate to a KafkaListenerErrorHandler instance or a bean name.
    Returns:
    the error handler.
    Since:
    Default:

    groupId

    String groupId
    Override the group.id property for the consumer factory with this value for this listener only.

    SpEL #{...} and property place holders ${...} are supported.

    Returns:
    the group id.
    Since:
    Default:
    boolean idIsGroup
    When groupId is not provided, use the id (if provided) as the group.id property for the consumer. Set to false, to use the group.id from the consumer factory.
    Returns:
    false to disable.
    Since:
    Default:

    clientIdPrefix

    String clientIdPrefix
    When provided, overrides the client id property in the consumer factory configuration. A suffix ('-n') is added for each container instance to ensure uniqueness when concurrency is used.

    SpEL #{...} and property place holders ${...} are supported.

    Returns:
    the client id prefix.
    Since:
    2.1.1
    Default:

    beanRef

    String beanRef
    A pseudo bean name used in SpEL expressions within this annotation to reference the current bean within which this listener is defined. This allows access to properties and methods within the enclosing bean. Default '__listener'. Example: topics = "#{__listener.topicList}" .

    Returns:
    the pseudo bean name.
    Since:
    2.1.2
    Default:
    "__listener"

    concurrency

    String concurrency
    Override the container factory's concurrency setting for this listener. May be a property placeholder or SpEL expression that evaluates to a Number , in which case Number.intValue() is used to obtain the value.

    SpEL #{...} and property place holders ${...} are supported.

    Returns:
    the concurrency.
    Since:
    Default:

    autoStartup

    String autoStartup
    Set to true or false, to override the default setting in the container factory. May be a property placeholder or SpEL expression that evaluates to a Boolean or a String , in which case the Boolean.parseBoolean(String) is used to obtain the value.

    SpEL #{...} and property place holders ${...} are supported.

    Returns:
    true to auto start, false to not auto start.
    Since:
    Default:

    properties

    String [] properties
    Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides). Supported Syntax

    The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file :

  • key=value
  • key:value
  • key value
  • group.id and client.id are ignored.

    SpEL #{...} and property place holders ${...} are supported. SpEL expressions must resolve to a String , a @{link String[]} or a Collection<String> where each member of the array or collection is a property name + value with the above formats.

    Returns:
    the properties.
    Since:
    2.2.4
    See Also:
  • ConsumerConfig
  • groupId()
  • clientIdPrefix()
  • splitIterables

    boolean splitIterables
    When false and the return type is an Iterable return the result as the value of a single reply record instead of individual records for each element. Default true. Ignored if the reply is of type Iterable<Message<?>> .
    Returns:
    false to create a single reply record.
    Since:
    2.3.5
    Default:
    Set the bean name of a SmartMessageConverter (such as the CompositeMessageConverter ) to use in conjunction with the MessageHeaders.CONTENT_TYPE header to perform the conversion to the required type. If a SpEL expression is provided ( #{...} ), the expression can either evaluate to a SmartMessageConverter instance or a bean name.
    Returns:
    the bean name.
    Since:
    2.7.1
    Default:
    String batch
    Override the container factory's batchListener property. The listener method signature should receive a List<?> ; refer to the reference documentation. This allows a single container factory to be used for both record and batch listeners; previously separate container factories were required.
    Returns:
    "true" for the annotated method to be a batch listener or "false" for a record listener. If not set, the container factory setting is used. SpEL and property placeholders are not supported because the listener type cannot be variable.
    Since:
    See Also:
  • Boolean.parseBoolean(String)
  • String filter
    Set an RecordFilterStrategy bean name to override the strategy configured on the container factory. If a SpEL expression is provided ( #{...} ), the expression can either evaluate to a RecordFilterStrategy instance or a bean name.
    Returns:
    the error handler.
    Since:
    2.8.4
    Default:
    Static information that will be added as a header with key KafkaHeaders.LISTENER_INFO . This can be used, for example, in a RecordInterceptor , RecordFilterStrategy or the listener itself, for any purposes. SpEL #{...} and property place holders ${...} are supported, but it must resolve to a String or byte[] . This header will be stripped out if an outbound record is created with the headers from an input record.

    Returns:
    the info.
    Since:
    2.8.4
    Default: