Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm trying to set an Spring Boot Application listening to Kafka.

I'm using Kafka Streams Binder.

With one simple @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {
        logger.info("Stream listening");
        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    interface StreamProcessor {
        String INPUT = "input_1";
        String OUTPUT = "output_1";
        @Input(INPUT)
        KStream<String, String> input();
        @Output(OUTPUT)
        KStream<String, String> outputProcessed();

and in application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

Everything works fine.

But if I try to add a second class with other binding, the following error occurs:

The following subscribed topics are not assigned to any members: [mytopic1]

Example of the second binding:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {
        logger.info("Stream listening binding two");
        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    interface StreamProcessor {
        String INPUT = "input_2";
        String OUTPUT = "output_2";
        @Input(INPUT)
        KStream<String, String> input();
        @Output(OUTPUT)
        KStream<String, String> outputProcessed();

What I'm missing? Can't I use multiple input topics and multiple output in the same application? There is something related to application.name?

I just tried an app and that worked. When you have multiple processors in the same application, you need to make sure that each processor gets its own application id. See below how I have 2 distinct application id's for both inputs in the application.yml.

I saw both processors are getting logged on the console. Also, saw the messages on the output topics.

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {
    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {
        System.out.println("Stream listening");
        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {
        System.out.println("Stream listening binding two");
        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    interface StreamProcessor1 {
        String INPUT = "input_1";
        String OUTPUT = "output_1";
        @Input(INPUT)
        KStream<String, String> input();
        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    interface StreamProcessor2 {
        String INPUT = "input_2";
        String OUTPUT = "output_2";
        @Input(INPUT)
        KStream<String, String> input();
        @Output(OUTPUT)
        KStream<String, String> outputProcessed();

Relevant part of application.yml

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4
                You're correct thanks. I think the documentation should be way more clearer than this... Reading the Spring Cloud Stream Docs (docs.spring.io/spring-cloud-stream/docs/current/reference/…) this isn't mentioned.... So Thank You very much again!
– Johnny Willer
                Feb 5, 2019 at 15:42
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.