Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Sequential calls with reactor-netty http client seem to free ByteBuf of the first response and causes IllegalReferenceCountException

Ask Question

I have a requirement to either make two http calls in sequence with one being dependent on another AND in another case to make two parallel calls and combine their results.

For Case1: Sequential calls dependent on one another, I am using "zipWhen" to combine my results together

For Case2: Parallel calls asynchronously, I am using "zip" to subscribe to the publishers of both http requests and combine results.

In both these cases, I have found that the ByteBuf which contains the first response is automatically losing reference when the second calls is made and results in RefCount exceptions when I try to access the first response.

(string1, string2) -> StandardCharsets.UTF_8.decode(string1.getT2().nioBuffer()).toString()

I suspect this has to do with responseSingle but I am not sure why this is the case. The reason why I am using responseSingle is because I want both headers and body of the response in response processing.

I am including an edited code sample below for Case1 (Sequential) from the reactor-netty workshop github repo which shows this behaviour:

package io.spring.workshop.reactornetty.http;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class HttpCompressionTests {
    @Test
    public void httpCompressionTest() {
        DisposableServer server =
                HttpServer.create()   // Prepares a HTTP server for configuration.
                          .port(0)    // Configures the port number as zero, this will let the system pick up
                                      // an ephemeral port when binding the server.
                          .handle((req, res) -> res.sendString(Mono.just("compressed response")))
                          .compress(true) // Enables compression.
                          .wiretap(true)  // Applies a wire logger configuration.
                          .bindNow(); // Starts the server in a blocking fashion, and waits for it to finish initializing.
        assertNotNull(server);
        String response =
                HttpClient.create()            // Prepares a HTTP client for configuration.
                          .port(server.port()) // Obtains the server's port and provides it as a port to which this
                                               // client should connect.
                          .compress(true)          // Enables compression.
                          .wiretap(true)           // Applies a wire logger configuration.
                          .get()               // Specifies that GET method will be used.
                          .uri("/test")        // Specifies the path.
                          .responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER)))   // Receives the response body.
//                          .aggregate()
//                          .asString()
                          .log("http-client")
                          .zipWhen(string1 ->
                              HttpClient.create()            // Prepares a HTTP client for configuration.
                                      .port(server.port()) // Obtains the server's port and provides it as a port to which this
                                      // client should connect.
                                      .compress(true)          // Enables compression.
                                      .wiretap(true)           // Applies a wire logger configuration.
                                      .get()               // Specifies that GET method will be used.
                                      .uri("/test")        // Specifies the path.
                                      .responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))),   // Receives the response body.
                                      (tuple1, tuple2) -> StandardCharsets.UTF_8.decode(tuple1.getT2().nioBuffer()).toString()
                          ).block();
        assertEquals("compressed response", response);
        server.disposeNow();          // Stops the server and releases the resources.

This results in:

io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1663)
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1231)
at io.spring.workshop.reactornetty.http.HttpCompressionTests.lambda$httpCompressionTest$4(HttpCompressionTests.java:59)
at reactor.core.publisher.Mono.lambda$null$45(Mono.java:5073)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)

Any help will be greatly appreciated. Thanks!!

I have tried retaining my first response ByteBuf and that seems to work but then I will have to manually decrement reference counts which I feel may lead to Mem leaks.

Reactor Netty will automatically release the ByteBuf if it is not consumed. You can disable this automatic release with invoking body.retain(), however once you finish with the ByteBuf, it is your responsibility to release it. The code above can be modified like this:

.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.retain().defaultIfEmpty(Unpooled.EMPTY_BUFFER)))   // Receives the response body.

More can be found in the javadoc

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.