相关文章推荐
伤情的红豆  ·  c# ...·  3 月前    · 
失恋的汽水  ·  c++ ...·  4 月前    · 
潇洒的弓箭  ·  Java 循环结构 – for, ...·  1 年前    · 
想表白的水桶  ·  javascript - ant vue ...·  1 年前    · 

Some of the key (and most popular) high-level features that Spring AMQP provides are to do with recovery and automatic re-connection in the event of a protocol error or broker failure. We have seen all the relevant components already in this guide, but it should help to bring them all together here and call out the features and recovery scenarios individually.

The primary reconnection features are enabled by the CachingConnectionFactory itself. It is also often beneficial to use the RabbitAdmin auto-declaration features. In addition, if you care about guaranteed delivery, you probably also need to use the channelTransacted flag in RabbitTemplate and SimpleMessageListenerContainer and the AcknowledgeMode.AUTO (or manual if you do the acks yourself) in the SimpleMessageListenerContainer .

The RabbitAdmin component can declare exchanges, queues, and bindings on startup. It does this lazily, through a ConnectionListener . Consequently, if the broker is not present on startup, it does not matter. The first time a Connection is used (for example, by sending a message) the listener fires and the admin features is applied. A further benefit of doing the auto declarations in a listener is that, if the connection is dropped for any reason (for example, broker death, network glitch, and others), they are applied again when the connection is re-established.

Queues declared this way must have fixed names — either explicitly declared or generated by the framework for AnonymousQueue instances. Anonymous queues are non-durable, exclusive, and auto-deleting. Automatic declaration is performed only when the CachingConnectionFactory cache mode is CHANNEL (the default). This limitation exists because exclusive and auto-delete queues are bound to the connection.

Starting with version 2.2.2, the RabbitAdmin will detect beans of type DeclarableCustomizer and apply the function before actually processing the declaration. This is useful, for example, to set a new argument (property) before it has first class support within the framework.

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        return dec;

It is also useful in projects that don’t provide direct access to the Declarable bean definitions.

See also RabbitMQ Automatic Connection/Topology recovery.

If you lose your connection to the broker in a synchronous sequence when using RabbitTemplate (for instance), Spring AMQP throws an AmqpException (usually, but not always, AmqpIOException). We do not try to hide the fact that there was a problem, so you have to be able to catch and respond to the exception. The easiest thing to do if you suspect that the connection was lost (and it was not your fault) is to try the operation again. You can do this manually, or you could look at using Spring Retry to handle the retry (imperatively or declaratively).

Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others). Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic. See the Javadoc and properties of StatefulRetryOperationsInterceptor and StatelessRetryOperationsInterceptor for more detail. Stateless retry is appropriate if there is no transaction or if a transaction is started inside the retry callback. Note that stateless retry is simpler to configure and analyze than stateful retry, but it is not usually appropriate if there is an ongoing transaction that must be rolled back or definitely is going to roll back. A dropped connection in the middle of a transaction should have the same effect as a rollback. Consequently, for reconnections where the transaction is started higher up the stack, stateful retry is usually the best choice. Stateful retry needs a mechanism to uniquely identify a message. The simplest approach is to have the sender put a unique value in the MessageId message property. The provided message converters provide an option to do this: you can set createMessageIds to true. Otherwise, you can inject a MessageKeyGenerator implementation into the interceptor. The key generator must return a unique key for each message. In versions prior to version 2.0, a MissingMessageIdAdvice was provided. It enabled messages without a messageId property to be retried exactly once (ignoring the retry settings). This advice is no longer provided, since, along with spring-retry version 1.2, its functionality is built into the interceptor and message listener containers.

For backwards compatibility, a message with a null message ID is considered fatal for the consumer (consumer is stopped) by default (after one retry). To replicate the functionality provided by the MissingMessageIdAdvice, you can set the statefulRetryFatalWithNullMessageId property to false on the listener container. With that setting, the consumer continues to run and the message is rejected (after one retry). It is discarded or routed to the dead letter queue (if one is configured).

Starting with version 1.3, a builder API is provided to aid in assembling these interceptors by using Java (in @Configuration classes). The following example shows how to do so:

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();

Only a subset of retry capabilities can be configured this way. More advanced features would need the configuration of a RetryTemplate as a Spring bean. See the Spring Retry Javadoc for complete information about available policies and their configuration.

It is not recommended to configure retry with a batch listener, unless the batch was created by the producer, in a single record. See Batched Messages for information about consumer and producer-created batches. With a consumer-created batch, the framework has no knowledge about which message in the batch caused the failure so recovery after the retries are exhausted is not possible. With producer-created batches, since there is only one message that actually failed, the whole message can be recovered. Applications may want to inform a custom recoverer where in the batch the failure occurred, perhaps by setting an index property of the thrown exception.

A retry recoverer for a batch listener must implement MessageBatchRecoverer.

If a MessageListener fails because of a business exception, the exception is handled by the message listener container, which then goes back to listening for another message. If the failure is caused by a dropped connection (not a business exception), the consumer that is collecting messages for the listener has to be cancelled and restarted. The SimpleMessageListenerContainer handles this seamlessly, and it leaves a log to say that the listener is being restarted. In fact, it loops endlessly, trying to restart the consumer. Only if the consumer is very badly behaved indeed will it give up. One side effect is that if the broker is down when the container starts, it keeps trying until a connection can be established.

Business exception handling, as opposed to protocol errors and dropped connections, might need more thought and some custom configuration, especially if transactions or container acks are in use. Prior to 2.8.x, RabbitMQ had no definition of dead letter behavior. Consequently, by default, a message that is rejected or rolled back because of a business exception can be redelivered endlessly. To put a limit on the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor in the advice chain of the listener. The interceptor can have a recovery callback that implements a custom dead letter action — whatever is appropriate for your particular environment.

Another alternative is to set the container’s defaultRequeueRejected property to false. This causes all failed messages to be discarded. When using RabbitMQ 2.8.x or higher, this also facilitates delivering the message to a dead letter exchange.

Alternatively, you can throw a AmqpRejectAndDontRequeueException. Doing so prevents message requeuing, regardless of the setting of the defaultRequeueRejected property.

Starting with version 2.1, an ImmediateRequeueAmqpException is introduced to perform exactly the opposite logic: the message will be requeued, regardless of the setting of the defaultRequeueRejected property.

Often, a combination of both techniques is used. You can use a StatefulRetryOperationsInterceptor in the advice chain with a MessageRecoverer that throws an AmqpRejectAndDontRequeueException. The MessageRecover is called when all retries have been exhausted. The RejectAndDontRequeueRecoverer does exactly that. The default MessageRecoverer consumes the errant message and emits a WARN message.

Starting with version 1.3, a new RepublishMessageRecoverer is provided, to allow publishing of failed messages after retries are exhausted.

When a recoverer consumes the final exception, the message is ack’d and is not sent to the dead letter exchange by the broker, if configured.

When RepublishMessageRecoverer is used on the consumer side, the received message has deliveryMode in the receivedDeliveryMode message property. In this case the deliveryMode is null. That means a NON_PERSISTENT delivery mode on the broker. Starting with version 2.0, you can configure the RepublishMessageRecoverer for the deliveryMode to set into the message to republish if it is null. By default, it uses MessageProperties default value - MessageDeliveryMode.PERSISTENT.
@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();

The RepublishMessageRecoverer publishes the message with additional information in message headers, such as the exception message, stack trace, original exchange, and routing key. Additional headers can be added by creating a subclass and overriding additionalHeaders(). The deliveryMode (or any other properties) can also be changed in the additionalHeaders(), as the following example shows:

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;

Starting with version 2.0.5, the stack trace may be truncated if it is too large; this is because all headers have to fit in a single frame. By default, if the stack trace would cause less than 20,000 bytes ('headroom') to be available for other headers, it will be truncated. This can be adjusted by setting the recoverer’s frameMaxHeadroom property, if you need more or less space for other headers. Starting with versions 2.1.13, 2.2.3, the exception message is included in this calculation, and the amount of stack trace will be maximized using the following algorithm:

if the stack trace alone would exceed the limit, the exception message header will be truncated to 97 bytes plus …​ and the stack trace is truncated too.

if the stack trace is small, the message will be truncated (plus …​) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus …​).

Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information. The evaluation is performed after the headers are enhanced so information such as the exception type can be used in the expressions.

Starting with version 2.4.8, the error exchange and routing key can be provided as SpEL expressions, with the Message being the root object for the evaluation.

Starting with version 2.3.3, a new subclass RepublishMessageRecovererWithConfirms is provided; this supports both styles of publisher confirms and will wait for the confirmation before returning (or throw an exception if not confirmed or the message is returned).

If the confirm type is CORRELATED, the subclass will also detect if a message is returned and throw an AmqpMessageReturnedException; if the publication is negatively acknowledged, it will throw an AmqpNackReceivedException.

If the confirm type is SIMPLE, the subclass will invoke the waitForConfirmsOrDie method on the channel.

See Publisher Confirms and Returns for more information about confirms and returns.

Starting with version 2.1, an ImmediateRequeueMessageRecoverer is added to throw an ImmediateRequeueAmqpException, which notifies a listener container to requeue the current failed message.

Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions. Given that user exceptions are wrapped in a ListenerExecutionFailedException, we need to ensure that the classification examines the exception causes. The default classifier looks only at the top level exception.

Since Spring Retry 1.0.3, the BinaryExceptionClassifier has a property called traverseCauses (default: false). When true, it travers exception causes until it finds a match or there is no cause.

To use this classifier for retry, you can use a SimpleRetryPolicy created with the constructor that takes the max attempts, the Map of Exception instances, and the boolean (traverseCauses) and inject this policy into the RetryTemplate.

Apache®, Apache Tomcat®, Apache Kafka®, Apache Cassandra™, and Apache Geode™ are trademarks or registered trademarks of the Apache Software Foundation in the United States and/or other countries. Java™, Java™ SE, Java™ EE, and OpenJDK™ are trademarks of Oracle and/or its affiliates. Kubernetes® is a registered trademark of the Linux Foundation in the United States and other countries. Linux® is the registered trademark of Linus Torvalds in the United States and other countries. Windows® and Microsoft® Azure are registered trademarks of Microsoft Corporation. “AWS” and “Amazon Web Services” are trademarks or registered trademarks of Amazon.com Inc. or its affiliates. All other trademarks and copyrights are property of their respective owners and are only mentioned for informative purposes. Other names may be trademarks of their respective owners.