Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
I have a requirement to either make two http calls in sequence with one being dependent on another AND in another case to make two parallel calls and combine their results.
For Case1: Sequential calls dependent on one another, I am using "zipWhen" to combine my results together
For Case2: Parallel calls asynchronously, I am using "zip" to subscribe to the publishers of both http requests and combine results.
In both these cases, I have found that the ByteBuf which contains the first response is automatically losing reference when the second calls is made and results in RefCount exceptions when I try to access the first response.
(string1, string2) -> StandardCharsets.UTF_8.decode(string1.getT2().nioBuffer()).toString()
I suspect this has to do with responseSingle but I am not sure why this is the case. The reason why I am using responseSingle is because I want both headers and body of the response in response processing.
I am including an edited code sample below for Case1 (Sequential) from the reactor-netty workshop github repo which shows this behaviour:
package io.spring.workshop.reactornetty.http;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class HttpCompressionTests {
@Test
public void httpCompressionTest() {
DisposableServer server =
HttpServer.create() // Prepares a HTTP server for configuration.
.port(0) // Configures the port number as zero, this will let the system pick up
// an ephemeral port when binding the server.
.handle((req, res) -> res.sendString(Mono.just("compressed response")))
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.bindNow(); // Starts the server in a blocking fashion, and waits for it to finish initializing.
assertNotNull(server);
String response =
HttpClient.create() // Prepares a HTTP client for configuration.
.port(server.port()) // Obtains the server's port and provides it as a port to which this
// client should connect.
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.get() // Specifies that GET method will be used.
.uri("/test") // Specifies the path.
.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))) // Receives the response body.
// .aggregate()
// .asString()
.log("http-client")
.zipWhen(string1 ->
HttpClient.create() // Prepares a HTTP client for configuration.
.port(server.port()) // Obtains the server's port and provides it as a port to which this
// client should connect.
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.get() // Specifies that GET method will be used.
.uri("/test") // Specifies the path.
.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))), // Receives the response body.
(tuple1, tuple2) -> StandardCharsets.UTF_8.decode(tuple1.getT2().nioBuffer()).toString()
).block();
assertEquals("compressed response", response);
server.disposeNow(); // Stops the server and releases the resources.
This results in:
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1663)
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1231)
at io.spring.workshop.reactornetty.http.HttpCompressionTests.lambda$httpCompressionTest$4(HttpCompressionTests.java:59)
at reactor.core.publisher.Mono.lambda$null$45(Mono.java:5073)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Any help will be greatly appreciated. Thanks!!
I have tried retaining my first response ByteBuf and that seems to work but then I will have to manually decrement reference counts which I feel may lead to Mem leaks.
Reactor Netty will automatically release the ByteBuf
if it is not consumed. You can disable this automatic release with invoking body.retain()
, however once you finish with the ByteBuf
, it is your responsibility to release it. The code above can be modified like this:
.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.retain().defaultIfEmpty(Unpooled.EMPTY_BUFFER))) // Receives the response body.
More can be found in the javadoc
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.