相关文章推荐
发呆的黑框眼镜  ·  Spring ...·  4 月前    · 
俊逸的围巾  ·  配置 Tableau Lightning ...·  5 月前    · 
不敢表白的钥匙扣  ·  Python ...·  7 月前    · 
玩滑板的消炎药  ·  canvas 效果库-掘金·  1 年前    · 
直爽的香蕉  ·  Getting Started | ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

App properties

spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json
spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json

It all works fine. Messages sent using the publisher are received.
Now I'm trying to send a message to this topic from another app, using KafkaTemplate:

kafkaTemplate.send(topic, message)

This time an error is thrown on the receiving side:

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
   at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
   at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
   at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
   at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
   ... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
   at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
   at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
   ... 27 common frames omitted

Spring version 5+.
Is this a valid scenario at all, sending messages using KafkaTemplate and expect them to be received by a cloud stream subscriber ?

Yes, it's a valid scenario; something is preventing the binding from being wired up properly. You need to show the binding properties for the subscriber as well. It's best to show all your code and configuration instead of just snippets. – Gary Russell Jan 4, 2020 at 15:40 You need spring.cloud.stream.bindings.subscriber.destination=... and ...consumer... properties. By default it will be bound to a topic called subscriber so the missing properties, per se, are not the problem. – Gary Russell Jan 6, 2020 at 18:51 @GaryRussell Added subscriber props as well. As pointed in the question, messages sent using the publisher are successfully delivered to the subscriber. – Bax Jan 6, 2020 at 19:02

Your @StreamListener is bound to the publisher channel instead of the subscriber channel.

Here is a working example:

@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {
    public static void main(String[] args) {
        SpringApplication.run(So59585815Application.class, args);
    @Autowired
    private MessageChannel publisher;
    @StreamListener("subscriber")
    public void listen(String in) {
        publisher.send(new GenericMessage<>(in.toUpperCase()));
    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("subscriber-topic", "foo".getBytes());
    @KafkaListener(id = "listener", topics = "publisher-topic")
    public void listen(byte[] in) {
        System.out.println(new String(in));
interface MyKafkaBinding {
    @Output("publisher")
    MessageChannel publisher();
    @Input("subscriber")
    SubscribableChannel subscriber();
spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.