Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap

Ask Question

I am using Spring Boot 2.7.0 and Spring Cloud Microservices stack where I'm trying to send the notification though kafka and getting the below error -

Error -

2022-06-12 13:18:51.114  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Instantiated an idempotent producer.
2022-06-12 13:18:51.179  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-06-12 13:18:51.180  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-06-12 13:18:51.181  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1655020131179
2022-06-12 13:18:51.221  INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: tK0r-yw2TY-UsYDTFnMFsQ
2022-06-12 13:18:51.223  INFO [order-service,,] 21889 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to 8004 with epoch 0
2022-06-12 13:18:51.332  INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Resetting the last seen epoch of partition t-order-0 to 0 since the associated topicId changed from null to 0H45cT8iQniqxpT2hWBtiQ
2022-06-12 13:18:51.372 ERROR [order-service,52ea0644f93a80da,a074556857b1af66] 21889 --- [o-auto-1-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] threw exception
java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
    at brave.kafka.clients.TracingProducer.send(TracingProducer.java:129) ~[brave-instrumentation-kafka-clients-5.13.9.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) ~[spring-kafka-2.8.6.jar:2.8.6]
    at com.example.orderservice.service.OrderService.placeOrder(OrderService.java:98) ~[classes/:na]
    at com.example.orderservice.service.OrderService$$FastClassBySpringCGLIB$$35786a76.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.20.jar:5.3.20]
    at com.example.orderservice.service.OrderService$$EnhancerBySpringCGLIB$$fd989b0.placeOrder(<generated>) ~[classes/:na]
    at com.example.orderservice.controller.OrderController.lambda$0(OrderController.java:33) ~[classes/:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]

Order-service

package com.example.orderservice.service;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.transaction.Transactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import com.example.orderservice.dto.InventoryResponse;
import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.dto.OrderLineItemsDto;
import com.example.orderservice.dto.OrderRequest;
import com.example.orderservice.model.Order;
import com.example.orderservice.model.OrderLineItems;
import com.example.orderservice.repository.OrderRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import brave.Span;
import brave.Tracer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@Transactional
public class OrderService {
    private static final String INVENTORY_SERVICE_URI = "http://inventory-service/api/inventory";
    @Autowired
    private OrderRepository orderRepository;
    @Autowired
    private WebClient.Builder webClientBuilder;
    @Autowired
    private Tracer tracer;
    @Autowired
    private StreamBridge streamBridge;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private ObjectMapper objectMapper;
    public String placeOrder(OrderRequest orderRequest) {
        log.info("OrderService | placeOrder method called ");
        List<OrderLineItems> orderLineItems = orderRequest.getOrderLineItemsDtoList()
                .stream()
                .map(this::mapToDto)
                .collect(Collectors.toList());
        Order order = new Order();
        order.setOrderNumber(UUID.randomUUID().toString());
        order.setOrderLineItemsList(orderLineItems);
        List<String> skuCodes = order.getOrderLineItemsList()
                .stream()
                .map(orderLineItem -> orderLineItem.getSkuCode())
                .collect(Collectors.toList());
        Span inventoryServiceLookup = tracer.nextSpan().name("InventoryServiceLookup");
        try (Tracer.SpanInScope isLookup = tracer.withSpanInScope(inventoryServiceLookup.start())){
            inventoryServiceLookup.tag("call", "inventory-service");
            // Call Inventory Service, and place order if product is in stock
            boolean allProductsInStock = webClientBuilder.build()
                    .get()
                    .uri(INVENTORY_SERVICE_URI, uriBuilder -> uriBuilder.queryParam("skuCode", skuCodes).build())
                    .retrieve()
                    .bodyToMono(InventoryResponse[].class)
                    .map(e -> Arrays.stream(e))
                    .block()
                    .allMatch(InventoryResponse::isInStock);
            if(allProductsInStock){
                orderRepository.save(order);
                // send notification to RabbitMQ
                streamBridge.send("notificationEventSupplier-out-0", this.getMessage(order));
                // Kafka
                String orderStr = getJsonString(new OrderDto(order.getOrderNumber()));
                kafkaTemplate.send("t-order", orderStr);
                return "Order Placed Successfully";
            } else {
                throw new IllegalArgumentException("Product is not in the stock, please try again later");
        }finally {
            inventoryServiceLookup.flush();
    private OrderLineItems mapToDto(OrderLineItemsDto orderLineItemsDto) {
        OrderLineItems orderLineItems = new OrderLineItems();
        orderLineItems.setPrice(orderLineItemsDto.getPrice());
        orderLineItems.setQuantity(orderLineItemsDto.getQuantity());
        orderLineItems.setSkuCode(orderLineItemsDto.getSkuCode());
        return orderLineItems;
    private Message<OrderDto> getMessage(Order order){
        return MessageBuilder.withPayload(new OrderDto(order.getOrderNumber())).build();
    private String getJsonString(OrderDto orderDto) {
        String json = null;
        try {
            json = objectMapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            log.error("JsonProcessingException | e", e);
        return json;

application.properties

spring.application.name=order-service
#MySQL DB
spring.datasource.url=jdbc:mysql://localhost:3306/order-service
spring.datasource.username=root
spring.datasource.password=Password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
spring.jpa.hibernate.ddl-auto=update
#spring.jpa.show-sql=true
spring.sql.init.mode=always
#spring.jpa.properties.hibernate.format_sql=true
server.port=0
#Eureka
#eureka.instance.prefer-ip-address=true
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://eureka:password@localhost:8761/eureka
eureka.client.register-with-eureka=true
eureka.client.fetch-registry=true
#spring.cloud.discovery.enabled=true
management.health.circuitbreakers.enabled=true
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
#Resilinece4j Properties
resilience4j.circuitbreaker.instances.inventory.registerHealthIndicator=true
resilience4j.circuitbreaker.instances.inventory.event-consumer-buffer-size=10
resilience4j.circuitbreaker.instances.inventory.slidingWindowType=COUNT_BASED
resilience4j.circuitbreaker.instances.inventory.slidingWindowSize=5
resilience4j.circuitbreaker.instances.inventory.failureRateThreshold=50
resilience4j.circuitbreaker.instances.inventory.waitDurationInOpenState=5s
resilience4j.circuitbreaker.instances.inventory.permittedNumberOfCallsInHalfOpenState=3
resilience4j.circuitbreaker.instances.inventory.automaticTransitionFromOpenToHalfOpenEnabled=true
#Resilience4J Timeout Properties
resilience4j.timelimiter.instances.inventory.timeout-duration=3s
#Resilience4J Retry Properties
resilience4j.retry.instances.inventory.max-attempts=3
resilience4j.retry.instances.inventory.wait-duration=5s
#Spring Cloud Stream Kafka Properties
spring.cloud.stream.output-bindings=notificationEventSupplier
spring.cloud.stream.bindings.notificationEventSupplier-out-0.destination=notification-events
spring.sleuth.integration.enabled=true
#Zipkin Properties
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.probability=1

You've not defined StringSerializer anywhere (or at least in the code shown).

So, your KafkaTemplate is configured to use ByteArraySerializer by default

Instead of objectMapper.writeValueAsString, you could use objectMapper.writeValueAsBytes and switch all relevant method and generic Kafka value types to be byte[]

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.