Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am building a Kafka Application Using Log Compaction on a Topic but I am not able to send a Tombstone Value (KafkaNull)

I have tried using the default configuration for a serializer and when that did not work I used the suggested changes from " Publish null/tombstone message with raw headers " To set the application.properties to:

spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

The code I have to send a message to a stream is

this.stockTopics.compactedStocks().send(MessageBuilder
    .withPayload(KafkaNull.INSTANCE)
    .setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
    .build())

this.stopTopics.compactedStocks() returns a messageStream that I can send messages to.

Every time I try and send that message with a KafkaNull instance as a payload I get the error Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.

I expect the message to simply be sent to the consumer with a null value but obviously it errors.

Also, what makes you believe that KafkaNull is application/json convertible? The Failed to convert message:... simply means that no suitable message converter was found (we bootstrap you with 7 by default). But you can easily provide your own MessageConverter implementation as a bean annotated with @StreamMessageConverter – Oleg Zhurakousky Jan 18, 2019 at 17:36 Reproduced - I think we need a standard converter for KafkaNull - the outbound channel adapter sends KafkaNull as null, so the channel interceptor needs to pass it unchanged. – Gary Russell Jan 18, 2019 at 17:51 public static void main(String[] args) { SpringApplication.run(So54257687Application.class, args); @Bean public ApplicationRunner runner(MessageChannel output) { return args -> output.send(new GenericMessage<>(KafkaNull.INSTANCE)); @KafkaListener(id = "foo", topics = "output") public void listen(@Payload(required = false) byte[] in) { System.out.println(in); @Bean @StreamMessageConverter public MessageConverter kafkaNullConverter() { class KafkaNullConverter extends AbstractMessageConverter { KafkaNullConverter() { super(Collections.emptyList()); @Override protected boolean supports(Class<?> clazz) { return KafkaNull.class.equals(clazz); @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { return message.getPayload(); @Override protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) { return payload; return new KafkaNullConverter();

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.