Set the ack mode to use when auto ack (in the configuration properties) is false.
Set the timeout for commitSync operations (if
ConsumerProperties.isSyncCommits()
. Overrides
the default api timeout property. In order of precedence:
this property
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
in
ConsumerProperties.setKafkaConsumerProperties(java.util.Properties)
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
in the consumer factory
properties
60 seconds
Overrides:
setSyncCommitTimeout
in class
ConsumerProperties
Parameters:
syncCommitTimeout
- the timeout.
See Also:
ConsumerProperties.setSyncCommits(boolean)
setIdleEventInterval
public
void
setIdleEventInterval
(
@Nullable
Long
idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns
no records and this interval has elapsed since a record was returned.
Parameters:
idleEventInterval
- the interval.
See Also:
setIdleBeforeDataMultiplier(double)
setIdleBeforeDataMultiplier
public
void
setIdleBeforeDataMultiplier
(double idleBeforeDataMultiplier)
Parameters:
idleBeforeDataMultiplier
- false to allow publishing.
Since:
See Also:
setIdleEventInterval(Long)
setIdlePartitionEventInterval
public
void
setIdlePartitionEventInterval
(
@Nullable
Long
idlePartitionEventInterval)
Set the idle partition event interval; when set, an event is emitted if a poll returns
no records for a partition and this interval has elapsed since a record was returned.
Parameters:
idlePartitionEventInterval
- the interval.
Return the consumer task executor.
Returns:
the executor.
getIdleBeforeDataMultiplier
public
double
getIdleBeforeDataMultiplier
()
Returns:
the noIdleBeforeData.
Since:
See Also:
getIdleEventInterval()
Return the idle partition event interval.
Returns:
the interval.
Deprecated, for removal: This API element is subject to removal in a future version.
Set the transaction manager to start a transaction; if it is a
KafkaAwareTransactionManager
, offsets
are committed with semantics equivalent to
ContainerProperties.AckMode.RECORD
and
ContainerProperties.AckMode.BATCH
depending on the listener type (record or batch). For other
transaction managers, adding the transaction manager to the container facilitates,
for example, a record or batch interceptor participating in the same transaction
(you must set the container's
interceptBeforeTx
property to false).
Parameters:
transactionManager
- the transaction manager.
Since:
See Also:
setAckMode(AckMode)
setKafkaAwareTransactionManager
Parameters:
kafkaAwareTransactionManager
- the transaction manager.
Since:
isBatchRecoverAfterRollback
public
boolean
isBatchRecoverAfterRollback
()
Recover batch records after rollback if true.
Returns:
true to recover.
Since:
setBatchRecoverAfterRollback
public
void
setBatchRecoverAfterRollback
(boolean batchRecoverAfterRollback)
enable the batch recover after rollback.
Parameters:
batchRecoverAfterRollback
- the batchRecoverAfterRollback to set.
Since:
setMonitorInterval
public
void
setMonitorInterval
(int monitorInterval)
The interval between checks for a non-responsive consumer in
seconds; default
30
.
Parameters:
monitorInterval
- the interval.
Since:
1.3.1
A scheduler used with the monitor interval.
Parameters:
scheduler
- the scheduler.
Since:
1.3.1
See Also:
setMonitorInterval(int)
setNoPollThreshold
public
void
setNoPollThreshold
(float noPollThreshold)
If the time since the last poll /
poll
timeout
exceeds this value, a NonResponsiveConsumerEvent is published. This value
should be more than 1.0 to avoid a race condition that can cause spurious events to
be published. Default
3.0f
.
Parameters:
noPollThreshold
- the threshold
Since:
1.3.1
isLogContainerConfig
public
boolean
isLogContainerConfig
()
Log the container configuration if true (INFO).
Returns:
true to log.
Since:
2.1.1
setLogContainerConfig
public
void
setLogContainerConfig
(boolean logContainerConfig)
Set to true to instruct each container to log this configuration.
Parameters:
logContainerConfig
- true to log.
Since:
2.1.1
isMissingTopicsFatal
public
boolean
isMissingTopicsFatal
()
If true, the container won't start if any of the configured topics are not present
on the broker. Does not apply when topic patterns are configured. Default false.
Returns:
the missingTopicsFatal.
Since:
setMissingTopicsFatal
public
void
setMissingTopicsFatal
(boolean missingTopicsFatal)
Set to true to prevent the container from starting if any of the configured topics
are not present on the broker. Does not apply when topic patterns are configured.
Default false;
Parameters:
missingTopicsFatal
- the missingTopicsFatal.
Since:
setIdleBetweenPolls
public
void
setIdleBetweenPolls
(long idleBetweenPolls)
The sleep interval in milliseconds used in the main loop between
Consumer.poll(Duration)
calls.
Defaults to
0
- no idling.
Parameters:
idleBetweenPolls
- the interval to sleep between polling cycles.
Since:
setMicrometerEnabled
public
void
setMicrometerEnabled
(boolean micrometerEnabled)
Parameters:
micrometerEnabled
- false to disable.
Since:
setObservationEnabled
public
void
setObservationEnabled
(boolean observationEnabled)
Set to true to enable observation via Micrometer. When false (default)
basic Micrometer timers are used instead (when enabled).
Parameters:
observationEnabled
- true to enable.
Since:
See Also:
setMicrometerEnabled(boolean)
setMicrometerTags
Set additional tags for the Micrometer listener timers.
Parameters:
tags
- the tags.
Since:
getMicrometerTags
Return static Micrometer tags.
Returns:
the tags.
Since:
setMicrometerTagsProvider
public
void
setMicrometerTagsProvider
(
@Nullable
Function
<org.apache.kafka.clients.consumer.ConsumerRecord<?,
?>,
Map
<
String
,
String
>> micrometerTagsProvider)
Set a function to provide dynamic tags based on the consumer record. These tags
will be added to any static tags provided in
micrometerTags
. Only applies to record listeners, ignored for batch listeners.
Does not apply if observation is enabled.
Parameters:
micrometerTagsProvider
- the micrometerTagsProvider.
Since:
2.9.8
See Also:
setMicrometerEnabled(boolean)
setMicrometerTags(Map)
setObservationEnabled(boolean)
getMicrometerTagsProvider
Return the Micrometer tags provider.
Returns:
the micrometerTagsProvider.
Since:
2.9.8
setConsumerStartTimeout
public
void
setConsumerStartTimeout
(
Duration
consumerStartTimeout)
Set the timeout to wait for a consumer thread to start before logging
an error. Default 30 seconds.
Parameters:
consumerStartTimeout
- the consumer start timeout.
isSubBatchPerPartition
public
boolean
isSubBatchPerPartition
()
Return whether to split batches by partition.
Returns:
subBatchPerPartition.
Since:
2.3.2
Return whether to split batches by partition; null if not set.
Returns:
subBatchPerPartition.
Since:
setSubBatchPerPartition
When using a batch message listener whether to dispatch records by partition (with
a transaction for each sub batch if transactions are in use) or the complete batch
received by the
poll()
. Useful when using transactions to enable zombie
fencing, by using a
transactional.id
that is unique for each
group/topic/partition. Defaults to true when using transactions with
EOSMode.ALPHA
and false when not using transactions or
with
EOSMode.BETA
.
Parameters:
subBatchPerPartition
- true for a separate transaction for each partition.
Since:
2.3.2
setAssignmentCommitOption
Parameters:
assignmentCommitOption
- the option.
Since:
2.3.6
setDeliveryAttemptHeader
public
void
setDeliveryAttemptHeader
(boolean deliveryAttemptHeader)
Set to true to populate the
KafkaHeaders.DELIVERY_ATTEMPT
header when
the error handler or after rollback processor implements
DeliveryAttemptAware
. There is a small overhead so this is false by
default.
Parameters:
deliveryAttemptHeader
- true to populate
Since:
getEosMode
Get the exactly once semantics mode.
Returns:
the mode.
Since:
See Also:
setEosMode(EOSMode)
setEosMode
Parameters:
eosMode
- the mode; default V2.
Since:
getTransactionDefinition
Get the transaction definition.
Returns:
the definition.
Since:
2.5.4
setTransactionDefinition
Set a transaction definition with properties (e.g. timeout) that will be copied to
the container's transaction template. Note that this is only generally useful when
used with a
KafkaAwareTransactionManager
that supports a custom definition; this does NOT
include the
KafkaTransactionManager
which has no concept of transaction timeout. It can be useful to start, for example
a database transaction, in the container, rather than using
@Transactional
on the listener, because then a record interceptor, or filter in a listener adapter
can participate in the transaction.
Parameters:
transactionDefinition
- the definition.
Since:
2.5.4
See Also:
setKafkaAwareTransactionManager(KafkaAwareTransactionManager)
setAdviceChain
public
void
setAdviceChain
(
Advice
... adviceChain)
Set a chain of listener
Advice
s; must not be null or have null elements.
Parameters:
adviceChain
- the adviceChain to set.
Since:
2.5.6
isStopContainerWhenFenced
public
boolean
isStopContainerWhenFenced
()
When true, the container will stop after a
ProducerFencedException
.
Returns:
the stopContainerWhenFenced
Since:
2.5.8
setStopContainerWhenFenced
public
void
setStopContainerWhenFenced
(boolean stopContainerWhenFenced)
Set to true to stop the container when a
ProducerFencedException
is thrown.
Currently, there is no way to determine if such an exception is thrown due to a
rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The
best solution is to ensure that the
transaction.timeout.ms
is large enough
so that transactions don't time out.
Parameters:
stopContainerWhenFenced
- true to stop the container.
Since:
2.5.8
isStopImmediate
public
boolean
isStopImmediate
()
When true, the container will be stopped immediately after processing the current record.
Returns:
true to stop immediately.
Since:
2.5.11
setStopImmediate
public
void
setStopImmediate
(boolean stopImmediate)
Set to true to stop the container after processing the current record (when stop()
is called). When false (default), the container will stop after all the results of
the previous poll are processed.
Parameters:
stopImmediate
- true to stop after the current record.
Since:
2.5.11
isAsyncAcks
public
boolean
isAsyncAcks
()
When true, async manual acknowledgments are supported.
Returns:
true for async ack support.
Since:
setAsyncAcks
public
void
setAsyncAcks
(boolean asyncAcks)
Parameters:
asyncAcks
- true to use async acks.
Since:
isPauseImmediate
public
boolean
isPauseImmediate
()
When pausing the container with a record listener, whether the pause takes effect
immediately, when the current record has been processed, or after all records from
the previous poll have been processed. Default false.
Returns:
whether to pause immediately.
Since:
setPauseImmediate
public
void
setPauseImmediate
(boolean pauseImmediate)
Set to true to pause the container after the current record has been processed, rather
than after all the records from the previous poll have been processed.
Parameters:
pauseImmediate
- true to pause immediately.
Since:
setObservationConvention
Parameters:
observationConvention
- the convention.
Since:
getPollTimeoutWhilePaused
public
Duration
getPollTimeoutWhilePaused
()
The poll timeout to use while paused; usually a lower number than
pollTimeout
.
Returns:
the pollTimeoutWhilePaused
Since:
2.9.7
setPollTimeoutWhilePaused
public
void
setPollTimeoutWhilePaused
(
Duration
pollTimeoutWhilePaused)
Set the poll timeout to use while paused; usually a lower number than
pollTimeout
. Should be greater than
zero to avoid a tight CPU loop while the consumer is paused. Default is 100ms.
Parameters:
pollTimeoutWhilePaused
- the pollTimeoutWhilePaused to set
Since:
2.9.7
isRestartAfterAuthExceptions
public
boolean
isRestartAfterAuthExceptions
()
Restart the container if stopped due to an auth exception.
Returns:
the restartAfterAuthExceptions
Since:
2.9.7
setRestartAfterAuthExceptions
public
void
setRestartAfterAuthExceptions
(boolean restartAfterAuthExceptions)
Set to true to automatically restart the container if an auth exception is
detected by the container (or all child containers).
Parameters:
restartAfterAuthExceptions
- true to restart.
Since:
2.9.7