Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
App properties
spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json
spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json
It all works fine. Messages sent using the publisher are received.
Now I'm trying to send a message to this topic from another app, using KafkaTemplate:
kafkaTemplate.send(topic, message)
This time an error is thrown on the receiving side:
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 27 common frames omitted
Spring version 5+.
Is this a valid scenario at all, sending messages using KafkaTemplate and expect them to be received by a cloud stream subscriber ?
–
–
–
Your @StreamListener
is bound to the publisher channel instead of the subscriber channel.
Here is a working example:
@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {
public static void main(String[] args) {
SpringApplication.run(So59585815Application.class, args);
@Autowired
private MessageChannel publisher;
@StreamListener("subscriber")
public void listen(String in) {
publisher.send(new GenericMessage<>(in.toUpperCase()));
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("subscriber-topic", "foo".getBytes());
@KafkaListener(id = "listener", topics = "publisher-topic")
public void listen(byte[] in) {
System.out.println(new String(in));
interface MyKafkaBinding {
@Output("publisher")
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.