相关文章推荐
喝醉的拖把  ·  使用 Python 将 html ...·  1 年前    · 
唠叨的火车  ·  java - How to parse a ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I have a webapp in which I use Kinesis Stream with spring-cloud-stream, spring-cloud-stream-binder-kinesis and spring-cloud-starter-sleuth. What I would like to do is to configure the error handling properly. I would like to:

  • handle a message stream
  • if something go wrong in the point 1 handle the error (specifically for each stream)
  • create a errorChannel that handle all errors globally that happened in point 2.
  • Properties:

    spring:
      cloud:
        stream:
          function:
            definition: destination1
          bindings:
            destination1-in-0:
              destination: destination1
              group: group1
    

    This is the configuration code (in a class annotated with @Configuration):

        @Bean
        public Consumer<MyStream> destination1() {
            return message -> {
                if (true) throw new RuntimeException("test");
                LOGGER.info("message: " + message);
        @ServiceActivator(inputChannel = "destination1.group1.errors")
        public void errorHandle(Message<?> message) {
            // TODO
            LOGGER.error("Handling ERROR: " + message);
            throw new RuntimeException("test2");
        @Bean
        public PublishSubscribeChannel errorChannel() {
            ErrorHandler errorHandler = throwable -> LOGGER.error("Handling ERROR in errorChannel error handler: ");
            Executor executor = new ConcurrentTaskExecutor();
            PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(executor);
            publishSubscribeChannel.setErrorHandler(errorHandler);
            return publishSubscribeChannel;
    

    Please note that step 1 and step 2 are working. What it is missing is the third step. After the Log "Handling ERROR: ..." I see another info log:

    a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[142], headers={aws_shard=shardId-000000000000, id=bb56042b-240f-43b7-4c69-90de463bfeca, sourceData={SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}, aws_receivedPartitionKey=2007189807, aws_receivedStream=mpeNotificationFraud, aws_receivedSequenceNumber=49645044161965514128638937043897685293310641448383676418, timestamp=1696588898426}]'
    for the '{SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}'.
    Consider to use 'errorChannel' flow for the compensation logic.
    

    Then there is this exception (Caused by: java.lang.RuntimeException: test2):

    org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@d13016c]; nested exception is java.lang.RuntimeException: test2
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.19.jar:5.5.19]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:113) ~[spring-integration-core-5.5.19.jar:5.5.19]
    

    What I want is that the errorChannel I have configured start working, so for this simple example it logs "Handling ERROR in errorChannel error handler".

    See docs how errors must be handled from now on: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_handle_error_messages

    @Bean
    public Consumer<ErrorMessage> myErrorHandler() {
    spring.cloud.stream.bindings.destination1-in-0.error-handler-definition=myErrorHandler
                    I think that in this way the error handler is specific for the destination1 stream. I was looking for something that I can use for each stream as a fallback if the specific errorHandler for stream is not present or there is an error.  Running my app without the bean "errorChannel" I see this log: No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.  I was thinking I could have used it but it's not working.
    – AM13
                    Oct 16, 2023 at 12:28
                    As far as I know the destination1.group1.errors doesn't work any more: we have to use that error-handler-definition. Then you  an just subscribe to the global errorChannel which is always there created by the framework for us.
    – Artem Bilan
                    Oct 16, 2023 at 14:33
                    tried the annotation @ServiceActivator(inputChannel = "errorChannel") above public void errorHandle but I cannot see the log of the bean "errorChannel" that I have create. Anyway destination1.group1.errors works for spring cloud stream 3.2.9
    – AM13
                    Oct 18, 2023 at 8:24
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.