Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm trying out Spring Cloud Data Flow and today I updated to the latest versions and since that I'm not able to create this simple example which should simple log the AMQP message...

rabbit | log

When I deploy this stream and simply publish a String message on the consumed queue, this works ok. But when it is a serialized PoJo it does not. The older versions of the data flow server + started apps based on spring boot 1.5.x did just do this.

Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    ... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:324) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:589) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:435) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 32 common frames omitted

Versions

  • spring-cloud-dataflow-server-local:1.6.2.RELEASE
  • Darwin-SR1-stream-applications-kafka-maven
  • There were significant changes and enhancements in content type negotiation in Spring Cloud Stream 2.*. You can read up on it here https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#content-type-management. Basically what I am seeing is that you don't have an appropriate MessageConverter in the stack. Also, from what I see your content type is text/json for which we do not provide a converter. Consider changing it to application/json.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.