fluent-logger-java is a Java library, to record events via Fluentd, from Java application. https://github.com/fluent/fluent-logger-java

使用该sdk过程发现,tcp连接断开之后,该sdk的重连机制无效。

2018-01-26 12:36:25,620 ERROR [org.fluentd.logger.sender.RawSocketSender] - <org.fluentd.logger.sender.RawSocketSender>
java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:200)
	at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:188)
	at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:158)
	at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:140)
	at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:135)
	at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101)
	at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86)
	at fluentdDemo.fluentdDemo.main(fluentdDemo.java:90)

查看源码:见RawSocketSender类

private void reconnect() throws IOException {
        if (socket == null) {
            connect();
        } else if (socket.isClosed() || (!socket.isConnected())) {
            close();
            connect();

判断 Socket 远程端连接如果关闭的话,就要重建连接。Socket的类提供了一些已经封装好的方法, 如  isClosed()、isConnected()、isInputStreamShutdown()、isOutputStreamShutdown()等,

在测试时发现,这些方法都是本地端的状态,无法判断远端是否已经断开连接。

有些同学处理类似问题时,通过OutputStream发送一段测试数据,如果发送失败就表示远端已经断开连接,类似ping,但是这样会影响到正常的输出数据,远端无法把正常数据和测试数据分开。

其实,这种方法也是可以的,只不过,不要发送测试数据,直接发送需要发送的数据,一旦失败,就主动close socket,再新建连接,再重新发送就行了。

也有些同学想到通过发送紧急数据,来验证连接状态,见socket类(如下),如果失败,就close socket,再新建连接。

* Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); getImpl().sendUrgentData (data);

可通过如下写法实现:

* 判断是否断开连接,断开返回true,没有返回false * @param socket * @return public Boolean isServerClose(Socket socket){ socket.sendUrgentData(0xFF);//发送1个字节的紧急数据,默认情况下,服务器端没有开启紧急数据处理,不影响正常通信 return false; }catch(Exception se){ return true;

前提:对方Socket的SO_OOBINLINE属性没有打开,就会自动舍弃这个字节,而SO_OOBINLINE属性默认情况下就是关闭的

见SocketOptions接口

* When the OOBINLINE option is set, any TCP urgent data received on * the socket will be received through the socket input stream. * When the option is disabled (which is the default) urgent data * is silently discarded. * @see Socket#setOOBInline * @see Socket#getOOBInline @Native public final static int SO_OOBINLINE = 0x1003;

当然,我觉得也可以通过定时发送紧急数据来做心跳,确保tcp长连接保活,对方可以不用回应。

测试结果:

这两种方式再连接断开后的第一次发送数据,并没有异常,但是server端没收到数据。第二次发送时候,才检测到连接异常。

有同学的说法是:Socket通过发送数据sendUrgentData()或PrintWriter 发送数据时的数据太小,被放到缓冲区没用实时发送导致的。后来尝试设置setSendBufferSize(1)发现能够正常出现异常,这样就能够判断实时网络连接断开了。(网上资料说sendUrgentData是实时发送数据不经过缓冲区的,但跟我实际测试的不一样,有待验证)

查看了一下源码,紧急数据的发送时间是,在之前write到OutputStream之后,在接下来write到OutputStream之前

* Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); getImpl().sendUrgentData (data);

  尝试设置setSendBufferSize(1)发现能够正常出现异常,这样就能够判断实时网络连接断开了。

fluentd的in_forward插件提供了基于udp的心跳监听,遗憾的是fluent-logger-java并没有做对应的心跳机制。

https://docs.fluentd.org/v0.12/articles/in_forward