Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I have a webapp in which I use Kinesis Stream with spring-cloud-stream, spring-cloud-stream-binder-kinesis and spring-cloud-starter-sleuth. What I would like to do is to configure the error handling properly. I would like to:
handle a message stream
if something go wrong in the point 1 handle the error (specifically for each stream)
create a errorChannel that handle all errors globally that happened in point 2.
Properties:
spring:
cloud:
stream:
function:
definition: destination1
bindings:
destination1-in-0:
destination: destination1
group: group1
This is the configuration code (in a class annotated with @Configuration):
@Bean
public Consumer<MyStream> destination1() {
return message -> {
if (true) throw new RuntimeException("test");
LOGGER.info("message: " + message);
@ServiceActivator(inputChannel = "destination1.group1.errors")
public void errorHandle(Message<?> message) {
// TODO
LOGGER.error("Handling ERROR: " + message);
throw new RuntimeException("test2");
@Bean
public PublishSubscribeChannel errorChannel() {
ErrorHandler errorHandler = throwable -> LOGGER.error("Handling ERROR in errorChannel error handler: ");
Executor executor = new ConcurrentTaskExecutor();
PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(executor);
publishSubscribeChannel.setErrorHandler(errorHandler);
return publishSubscribeChannel;
Please note that step 1 and step 2 are working. What it is missing is the third step.
After the Log "Handling ERROR: ..." I see another info log:
a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[142], headers={aws_shard=shardId-000000000000, id=bb56042b-240f-43b7-4c69-90de463bfeca, sourceData={SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}, aws_receivedPartitionKey=2007189807, aws_receivedStream=mpeNotificationFraud, aws_receivedSequenceNumber=49645044161965514128638937043897685293310641448383676418, timestamp=1696588898426}]'
for the '{SequenceNumber: 49645044161965514128638937043897685293310641448383676418,ApproximateArrivalTimestamp: Fri Oct 06 12:41:37 CEST 2023,Data: java.nio.HeapByteBuffer[pos=0 lim=142 cap=142],PartitionKey: 2007189807,}'.
Consider to use 'errorChannel' flow for the compensation logic.
Then there is this exception (Caused by: java.lang.RuntimeException: test2):
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@d13016c]; nested exception is java.lang.RuntimeException: test2
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.19.jar:5.5.19]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:113) ~[spring-integration-core-5.5.19.jar:5.5.19]
What I want is that the errorChannel I have configured start working, so for this simple example it logs "Handling ERROR in errorChannel error handler".
See docs how errors must be handled from now on: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_handle_error_messages
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
spring.cloud.stream.bindings.destination1-in-0.error-handler-definition=myErrorHandler
–
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.