Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I'm trying to set an Spring Boot Application listening to Kafka.
I'm using Kafka Streams Binder.
With one simple
@EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
and in application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
Everything works fine.
But if I try to add a second class with other binding, the following error occurs:
The following subscribed topics are not assigned to any members: [mytopic1]
Example of the second binding:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
interface StreamProcessor {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
What I'm missing? Can't I use multiple input topics and multiple output in the same application? There is something related to application.name?
I just tried an app and that worked. When you have multiple processors in the same application, you need to make sure that each processor gets its own application id.
See below how I have 2 distinct application id's for both inputs in the application.yml
.
I saw both processors are getting logged on the console. Also, saw the messages on the output topics.
@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {
public static void main(String[] args) {
SpringApplication.run(So54522918Application.class, args);
@StreamListener(StreamProcessor1.INPUT)
@SendTo(StreamProcessor1.OUTPUT)
public KStream<String, String> process1(KStream<String, String> input) {
System.out.println("Stream listening");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
@StreamListener(StreamProcessor2.INPUT)
@SendTo(StreamProcessor2.OUTPUT)
public KStream<String, String> process2(KStream<String, String> input) {
System.out.println("Stream listening binding two");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
interface StreamProcessor1 {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
interface StreamProcessor2 {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
Relevant part of application.yml
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input_1.consumer.application-id: process-1
bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
destination: mytopic1
spring.cloud.stream.bindings.output_1:
destination: mytopic2
spring.cloud.stream.bindings.input_2:
destination: mytopic3
spring.cloud.stream.bindings.output_2:
destination: mytopic4
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.