相关文章推荐
成熟的火柴  ·  Shadowsocks/SS客户端 - ...·  1 年前    · 
另类的滑板  ·  Android进阶CameraX与Camer ...·  1 年前    · 
酒量小的煎饼  ·  Grid is incorrectly ...·  1 年前    · 
面冷心慈的树叶  ·  Csharp ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

How does the Camel Netty TCP socket consumer decide how to split incoming data into messages (and is it configurable)?

Ask Question

I'm working with a Camel flow that uses a Netty TCP socket consumer to receive messages from a client program (which is outside of my control). The client should be opening a socket, sending us one message, then closing the socket, but we've been seeing cases where instead of one message Camel is "splitting" the text stream into two parts and trying to process them separately.

So I'm trying to figure out, since you can re-use the same socket for multiple Camel messages, but TCP sockets don't have a built-in concept of "frames" or a standard for message delimiters, how does Camel decide that a complete message has been received and is ready to process? I haven't been able to find a documented answer to this in the Netty component docs ( https://camel.apache.org/components/3.15.x/netty-component.html ), although maybe I'm missing something.

From playing around with a test script, it seems like one answer is "Camel assumes a message is complete and should be processed if it goes more than 1ms without receiving any input on the socket". Is this a correct statement, and if so is this behavior documented anywhere? Is there any way to change or configure this behavior? Really what I would prefer is for Camel to wait for an ETX character (or a much longer timeout) before processing a message, is that possible to set up?

Here's my test setup:

Camel flow:

from("netty:tcp://localhost:3003")
        .log("Received: ${body}");

Python snippet:

DELAY_MS = 3
def send_msg(sock, msg):
    print("Sending message: <{}>".format(msg))
    if not sock.sendall(msg.encode()) is None:
        print("Message failed to send")
    time.sleep(DELAY_MS / 1000.0)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Using DELAY_MS: {}".format(str(DELAY_MS)))
    s.connect((args.hostname, args.port))
    cutoff = int(math.floor(len(args.msg) / 2))
    msg1 = args.msg[:cutoff]
    send_msg(s, msg1)
    msg2 = args.msg[cutoff:]
    send_msg(s, msg2)
    response = s.recv(1024)
except Exception as e:
    print(e)
finally:
    s.close()

I can see that with DELAY_MS=1 Camel logs one single message:

2022-02-21 16:54:40.689  INFO 19429 --- [erExecutorGroup] route1                                   : Received: a long string sent over the socket

But with DELAY_MS=2 it logs two separate messages:

2022-02-21 16:56:12.899  INFO 19429 --- [erExecutorGroup] route1                                   : Received: a long string sen
2022-02-21 16:56:12.899  INFO 19429 --- [erExecutorGroup] route1                                   : Received: t over the socket
                It probably just gives you whatever has arrived, which in turn is determined by how it was segmented and packetized when sent, which you can't control. So you just have to keep reading until you have what you need.
– user207421
                Feb 22, 2022 at 2:37
                Hm, I think "keep reading until you have what you need" would make sense if I was using Netty directly, but Camel by design wraps up what it's received into a  [camel.apache.org/manual/exchange.html](message exchange) for you and hands it off to your processing logic. So what I'm trying to answer is, by default, where/when/why does Camel stop reading and create that message?
– zoesnape
                Feb 22, 2022 at 4:41

After doing some more research, it seems like what I need to do is add a delimiter-based FrameDecoder to the decoders list.

Setting it up like this:

    from("netty:tcp://localhost:3003?sync=true"
         + "&decoders=#frameDecoder,#stringDecoder"
         + "&encoders=#stringEncoder")

where frameDecoder is provided by

@Bean
ChannelHandlerFactory frameDecoder() {
  ByteBuf[] ETX_DELIM = new ByteBuf[] { Unpooled.wrappedBuffer(new byte[] { (char)3 }) };
  return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1024, ETX_DELIM,
                                                               false, "tcp");

seems to do the trick.

On the flip side though, it seems like this will hang indefinitely (or until lower-level TCP timeouts kick in?) if an ETX frame is not received, and I can't figure out any way to set a timeout on the decoder, so would still be eager for input if anyone knows how to do that.

I think the default "timeout" behavior I was seeing might've just been an artifact of Netty's read loop speed -- How does netty determine when a read is complete?

You cannot add a timeout to the decoder, because a connection interruption can also cause you to miss packets for X amount of time in the middle of a message – Ferrybig Feb 23, 2022 at 12:46 Sure, but a client could also send you a bad message which doesn't include the delimiter character. If you can't know for sure whether something is delayed or never coming, it's reasonable to eventually give up and throw an exception for another level of the program to deal with. Something like netty.io/4.0/api/io/netty/handler/timeout/…, but it seems like I can't use that with Camel because it's not Shareable – zoesnape Feb 23, 2022 at 22:30

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.