Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I am building a Kafka Application Using Log Compaction on a Topic but I am not able to send a Tombstone Value (KafkaNull)
I have tried using the default configuration for a serializer and when that did not work I used the suggested changes from "
Publish null/tombstone message with raw headers
" To set the application.properties to:
spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer
The code I have to send a message to a stream is
this.stockTopics.compactedStocks().send(MessageBuilder
.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
.build())
this.stopTopics.compactedStocks() returns a messageStream that I can send messages to.
Every time I try and send that message with a KafkaNull instance as a payload I get the error Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.
I expect the message to simply be sent to the consumer with a null value but obviously it errors.
–
–
public static void main(String[] args) {
SpringApplication.run(So54257687Application.class, args);
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> output.send(new GenericMessage<>(KafkaNull.INSTANCE));
@KafkaListener(id = "foo", topics = "output")
public void listen(@Payload(required = false) byte[] in) {
System.out.println(in);
@Bean
@StreamMessageConverter
public MessageConverter kafkaNullConverter() {
class KafkaNullConverter extends AbstractMessageConverter {
KafkaNullConverter() {
super(Collections.emptyList());
@Override
protected boolean supports(Class<?> clazz) {
return KafkaNull.class.equals(clazz);
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
return message.getPayload();
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
return payload;
return new KafkaNullConverter();
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.