相关文章推荐
刀枪不入的钥匙扣  ·  hasura graphql ...·  1 年前    · 
买醉的吐司  ·  Linux系统 ...·  1 年前    · 

Spring Integration的TCP 和 UDP 支持(二)_TCP

网关

网关会自动关联消息。 但是,对于容量相对较小的应用程序,应使用出站网关。 将连接工厂配置为对所有消息对使用单个共享连接(“single-use=”false“)时,一次只能处理一条消息。 新消息必须等到收到对上一条消息的答复。 当为每个新消息配置连接工厂以使用新连接(“single-use=”true“)时,此限制不适用。 虽然此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。

因此,对于高容量消息,请考虑使用一对协作通道适配器。 但是,为此,您需要提供协作逻辑。

Spring Integration 2.2 中引入的另一个解决方案是使用 ,它允许使用共享连接池。​ ​CachingClientConnectionFactory​

协作出站和入站通道适配器

若要实现高容量吞吐量(避免使用网关的陷阱,如前所述),可以配置一对协作的出站和入站通道适配器。 您还可以使用协作适配器(服务器端或客户端)进行完全异步的通信(而不是使用请求-答复语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器添加一个标头,允许出站适配器确定在发送回复消息时使用哪个连接。

在服务器端,必须填充标头,因为它用于将消息与连接相关联。 源自入站适配器的消息会自动设置标头。 如果您希望构造要发送的其他消息,则需要设置标头。 您可以从传入消息中获取标头值。​ ​ip_connectionId​

在客户端,应用程序必须根据需要提供自己的关联逻辑。 您可以通过多种方式执行此操作。

如果消息有效负载具有一些自然的相关数据(如事务 ID 或订单号),并且您不需要保留原始出站消息中的任何信息(如回复通道标头),则关联很简单,在任何情况下都会在应用程序级别完成。

如果消息有效负载具有一些自然相关数据(如事务 ID 或订单号),但需要保留原始出站邮件中的某些信息(如回复通道标头),则可以保留原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合必要的数据。

对于前两种方案中的任何一种,如果有效负载没有自然相关数据,则可以在出站通道适配器的上游提供转换器,以使用此类数据增强有效负载。 此类转换器可以将原始有效负载转换为包含原始有效负载和消息标头的某些子集的新对象。 当然,标头中的活动对象(例如回复通道)不能包含在转换后的有效负载中。

如果选择此类策略,则需要确保连接工厂具有适当的序列化程序-反序列化程序对来处理此类有效负载(例如,使用 java 序列化的 and 或自定义序列化程序和反序列化程序)。 TCP 连接工厂中提到的选项(包括默认值)不支持此类有效负载,除非转换后的有效负载是 或 。​ ​DefaultSerializer​ ​​ ​DefaultDeserializer​ ​​ ​ByteArray*Serializer​ ​​ ​ByteArrayCrLfSerializer​ ​​ ​String​ ​​ ​byte[]​


在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时,该属性默认为默认回复超时(10 秒)。 这意味着,如果入站适配器在这段时间内未收到任何数据,则套接字将关闭。​ ​so-timeout​



此默认行为不适用于真正的异步环境,因此它现在默认为无限超时。 可以通过将客户端连接工厂上的属性设置为 10000 毫秒来恢复以前的默认行为。​ ​so-timeout​


从版本 5.4 开始,多个出站通道适配器和一个可以共享同一个连接工厂。 这允许应用程序同时支持请求/回复和任意服务器→客户端消息传递。 有关详细信息,请参阅 TCP 网关。​ ​TcpInboundChannelAdapter​

传输标头

TCP 是一种流式处理协议。 并在流中划分消息。 在 3.0 之前,只能通过 TCP 传输消息负载( 或 )。 从 3.0 开始,您可以传输选定的标头以及有效负载。 但是,“活动”对象(如标头)不能序列化。​ ​Serializers​ ​​ ​Deserializers​ ​​ ​String​ ​​ ​byte[]​ ​​ ​replyChannel​

通过 TCP 发送标头信息需要一些额外的配置。

第一步是提供使用该属性的 。 此映射器委托给任何实现,以将消息与某个对象相互转换,这些对象可由配置的 和 进行序列化和反序列化。​ ​ConnectionFactory​ ​​ ​MessageConvertingTcpMessageMapper​ ​​ ​mapper​ ​​ ​MessageConverter​ ​​ ​serializer​ ​​ ​deserializer​

Spring 集成提供了一个 ,它允许指定添加到对象的标头列表以及有效负载。 生成的映射有两个条目:和 。 该条目本身是 a,包含选定的标头。​ ​MapMessageConverter​ ​​ ​Map​ ​​ ​payload​ ​​ ​headers​ ​​ ​headers​ ​​ ​Map​

第二步是提供一个串行器和一个可以在 a 和某种线格式之间进行转换的反序列化程序。 这可以是自定义或,如果对等系统不是 Spring 集成应用程序,则通常需要它。​ ​Map​ ​​ ​Serializer​ ​​ ​Deserializer​

Spring Integration 提供了一个将 a 与 JSON 之间的转换。 它使用弹簧集成。 如果需要,您可以提供自定义。 默认情况下,序列化程序在对象之间插入换行符 () 字符。 有关更多信息,请参阅 Javadoc。​ ​MapJsonSerializer​ ​​ ​Map​ ​​ ​JsonObjectMapper​ ​​ ​JsonObjectMapper​ ​​ ​0x0a​

使用类路径上的任何版本。​ ​JsonObjectMapper​ ​​ ​Jackson​

还可以使用 和 来使用 的标准 Java 序列化 。​ ​Map​ ​​ ​DefaultSerializer​ ​​ ​DefaultDeserializer​

以下示例显示了使用 JSON 传输 、 和标头的连接工厂的配置:​ ​correlationId​ ​​ ​sequenceNumber​ ​​ ​sequenceSize​

<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>

<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用上述配置发送的消息,有效负载为“something”,将出现在网络上,如下所示:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}

关于非阻塞 I/O (NIO)

使用 NIO(请参阅 IP 配置属性)可避免将线程专用于从每个套接字读取。 对于少量套接字,您可能会发现不使用 NIO 以及异步切换(例如到 a)的性能与使用 NIO 一样好或更好。​ ​using-nio​ ​​ ​QueueChannel​

在处理大量连接时,应考虑使用 NIO。 但是,使用NIO还有其他一些后果。 线程池(在任务执行器中)在所有套接字之间共享。 每个传入消息都经过组装并发送到配置的通道,作为从该池中选择的线程上的单独工作单元。 到达同一套接字的两个连续消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不维护到达套接字的消息的严格顺序。

对于某些应用程序,这不是问题。 对于其他人来说,这是一个问题。 如果需要严格排序,请考虑设置和使用异步切换。​ ​using-nio​ ​​ ​false​

或者,您可以在入站终端节点的下游插入重新排序程序,以将消息返回到其正确的序列。 如果设置为 在连接工厂上,则到达 TCP 连接的消息将设置和标头。 重新排序器使用这些标头将消息返回到其正确的序列。​ ​apply-sequence​ ​​ ​true​ ​​ ​sequenceNumber​ ​​ ​correlationId​

从版本 5.1.4 开始,接受新连接优先于从现有连接读取。 通常,这应该影响不大,除非您的新传入连接率非常高。 如果要恢复到以前授予读取优先级的行为,请将 上的属性设置为 。​ ​multiAccept​ ​​ ​TcpNioServerConnectionFactory​ ​​ ​false​

泳池大小

不再使用池大小属性。 以前,它在未指定任务执行程序时指定默认线程池的大小。 它还用于设置服务器套接字上的连接积压工作。 不再需要第一个函数(请参阅下一段)。 第二个函数由属性替换。​ ​backlog​

以前,当将固定线程池任务执行器(默认)与 NIO 一起使用时,可能会出现死锁并且处理将停止。 当缓冲区已满,从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间时,会出现此问题。 这只发生在非常小的游泳池大小的情况下,但在极端条件下是可能的。 从 2.2 开始,两个更改消除了这个问题。 首先,默认任务执行器是缓存的线程池执行器。 其次,添加了死锁检测逻辑,以便在发生线程匮乏时,将引发异常而不是死锁,从而释放死锁资源。

现在默认任务执行程序是无限的,如果消息处理需要较长的时间,则传入消息速率较高时可能会出现内存不足情况。 如果应用程序表现出此类行为,则应使用具有适当池大小的池任务执行程序,但请参阅下一节。

具有策略的线程池任务执行器​ ​CALLER_RUNS​

当您将固定线程池与 ( 使用命名空间时)一起使用并且队列容量较小时,应记住一些重要的注意事项。​ ​CallerRunsPolicy​ ​​ ​CALLER_RUNS​ ​​ ​<task/>​

如果不使用固定线程池,则以下内容不适用。

通过NIO连接,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行器将 I/O 读取操作调度到其他线程)。 当 I/O 读取器线程(读取操作调度到该线程)读取数据时,它会传递给另一个线程来组合传入的消息。 大型邮件可能需要多次读取才能完成。 这些“汇编程序”线程可以在等待数据时阻塞。 当发生新的读取事件时,读取器确定此套接字是否已具有汇编程序,如果没有,则运行新的汇编程序。 组装过程完成后,汇编程序线程将返回到池中。

当池耗尽、拒绝策略正在使用且任务队列已满时,这可能会导致死锁。 当池为空且队列中没有空间时,IO 选择器线程将接收事件并使用执行程序调度读取。 队列已满,因此选择器线程本身启动读取过程。 现在,它检测到此套接字没有汇编程序,并在执行读取之前触发汇编程序。 同样,队列已满,选择器线程成为汇编器。 汇编程序现在被阻止,等待读取数据,这永远不会发生。 连接工厂现在已死锁,因为选择器线程无法处理新事件。​ ​CALLER_RUNS​ ​​ ​OP_READ​

为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。 我们希望对 IO 和程序集操作使用单独的池。

该框架提供了一个 ,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息组装。 在此环境中,IO 线程永远不会成为汇编程序线程,并且不会发生死锁。​ ​CompositeExecutor​

此外,任务执行程序应配置为使用 ( 时使用 )。 当 I/O 任务无法完成时,它会延迟一小段时间并不断重试,直到可以完成并分配组装程序。 默认情况下,延迟为 100 毫秒,但您可以通过在连接工厂上设置属性来更改它(使用 XML 命名空间进行配置时)。​ ​AbortPolicy​ ​​ ​ABORT​ ​​ ​<task>​ ​​ ​readDelay​ ​​ ​read-delay​

以下三个示例演示如何配置复合执行程序:

@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>

SSL/TLS 支持

支持安全套接字层/传输层安全性。 使用 NIO 时,JDK 5+ 功能用于在建立连接后处理握手。 不使用 NIO 时,将使用标准和对象来创建连接。 提供了许多策略界面,以允许进行重大自定义。 这些接口的默认实现提供了开始使用安全通信的最简单方法。​ ​SSLEngine​ ​​ ​SSLSocketFactory​ ​​ ​SSLServerSocketFactory​

开始

无论是否使用 NIO,都需要在连接工厂上配置属性。 此属性引用描述所需密钥库的位置和密码的 <bean/> 定义。​ ​ssl-context-support​

SSL/TLS 对等方每个都需要两个密钥存储:

  • 包含用于标识对等方的私钥和公钥对的密钥库
  • 包含受信任的对等方的公钥的信任库。 请参阅 JDK 随附的实用程序的文档。 基本步骤是 keytool
  1. 创建新的密钥对并将其存储在密钥库中。
  2. 导出公钥。
  3. 将公钥导入对等方的信任库。
  4. 对另一个对等方重复此操作。


在测试用例中,在两个对等方上使用相同的密钥存储是很常见的,但在生产中应避免这样做。

建立密钥存储区后,下一步是向 Bean 指示它们的位置,并向连接工厂提供对该 Bean 的引用。​ ​TcpSSLContextSupport​

以下示例配置 SSL 连接:

<bean id="sslContextSupport"
class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
<constructor-arg value="client.ks"/>
<constructor-arg value="client.truststore.ks"/>
<constructor-arg value="secret"/>
<constructor-arg value="secret"/>
</bean>

<ip:tcp-connection-factory id="clientFactory"
type="client"
host="localhost"
port="1234"
ssl-context-support="sslContextSupport" />

该类还有一个可选属性,该属性可以是 或(默认值)。​ ​DefaultTcpSSLContextSupport​ ​​ ​protocol​ ​​ ​SSL​ ​​ ​TLS​

密钥库文件名(前两个构造函数参数)使用 Spring 抽象。 缺省情况下,文件位于类路径上,但您可以使用前缀覆盖它(改为在文件系统上查找文件)。​ ​Resource​ ​​ ​file:​

从版本 4.3.6 开始,使用 NIO 时,可以在连接工厂上指定(以秒为单位)。 此超时(默认值为 30 秒)在等待数据的 SSL 握手期间使用。 如果超过超时,进程将停止并关闭套接字。​ ​ssl-handshake-timeout​

主机验证

从版本 5.0.8 开始,您可以配置是否启用主机验证。 从版本 5.1 开始,默认情况下启用它;禁用它的机制取决于您是否正在使用 NIO。

主机验证用于确保您连接到的服务器与证书中的信息匹配,即使证书受信任也是如此。

使用 NIO 时,请配置 ,例如。​ ​DefaultTcpNioSSLConnectionSupport​

@Bean
public DefaultTcpNioSSLConnectionSupport connectionSupport() {
DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks",
"test.truststore.ks", "secret", "secret");
sslContextSupport.setProtocol("SSL");
DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport =
new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false);
return tcpNioConnectionSupport;
}

第二个构造函数参数禁用主机验证。 然后将豆子注入蔚来连接工厂。​ ​connectionSupport​

不使用蔚来时,配置位于:​ ​TcpSocketSupport​

connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));

同样,构造函数参数禁用主机验证。

先进的技术

本节介绍在某些情况下可能会有用的高级技术。

策略界面

在许多情况下,前面描述的配置就是通过 TCP/IP 启用安全通信所需的全部配置。 但是,Spring Integration 提供了许多策略接口,允许自定义和修改套接字工厂和套接字:

  • ​TcpSSLContextSupport​
  • ​TcpSocketFactorySupport​
  • ​TcpSocketSupport​
  • ​TcpNetConnectionSupport​
  • ​TcpNioConnectionSupport​
策略界面​ ​TcpSSLContextSupport​

以下清单显示了策略界面:​ ​TcpSSLContextSupport​

public interface TcpSSLContextSupport {

SSLContext getSSLContext() throws Exception;

}

接口的实现负责创建对象。 框架提供的实现是前面描述的。 如果需要不同的行为,请实现此接口,并向连接工厂提供对类实现的 Bean 的引用。​ ​TcpSSLContextSupport​ ​​ ​SSLContext​ ​​ ​DefaultTcpSSLContextSupport​

策略界面​ ​TcpSocketFactorySupport​

以下清单显示了策略接口的定义:​ ​TcpSocketFactorySupport​

public interface TcpSocketFactorySupport {

ServerSocketFactory getServerSocketFactory();

SocketFactory getSocketFactory();

}

此接口的实现负责获取对 和 的引用。 提供了两种实现。 第一个是非 SSL 套接字(未定义属性时)。 这将使用 JDK 的默认工厂。 第二个实现是 。 默认情况下,在定义属性时使用此选项。 它使用该 Bean 创建的 来创建套接字工厂。​ ​ServerSocketFactory​ ​​ ​SocketFactory​ ​​ ​DefaultTcpNetSocketFactorySupport​ ​​ ​ssl-context-support​ ​​ ​DefaultTcpNetSSLSocketFactorySupport​ ​​ ​ssl-context-support​ ​​ ​SSLContext​

仅当 是 时,此接口才适用。 蔚来不使用插座工厂。​ ​using-nio​ ​​ ​false​

策略界面​ ​TcpSocketSupport​

以下清单显示了策略接口的定义:​ ​TcpSocketSupport​

public interface TcpSocketSupport {

void postProcessServerSocket(ServerSocket serverSocket);

void postProcessSocket(Socket socket);

}

此接口的实现可以在创建套接字之后以及应用所有配置的属性之后但在使用套接字之前修改套接字。 无论您是否使用NIO,这都适用。 例如,您可以使用此接口的实现来修改 SSL 套接字上支持的密码套件,也可以添加在 SSL 握手完成后收到通知的侦听器。 框架提供的唯一实现是 ,它不会以任何方式修改套接字。​ ​DefaultTcpSocketSupport​

要提供您自己的 or 实现,请分别通过设置 and 属性为连接工厂提供对自定义类型的 Bean 的引用。​ ​TcpSocketFactorySupport​ ​​ ​TcpSocketSupport​ ​​ ​socket-factory-support​ ​​ ​socket-support​

策略界面​ ​TcpNetConnectionSupport​

以下清单显示了策略接口的定义:​ ​TcpNetConnectionSupport​

public interface TcpNetConnectionSupport {

TcpNetConnection createNewConnection(Socket socket,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;

}

调用此接口以创建类型(或其子类)的对象。 框架提供单个实现 (),默认情况下,该实现创建简单对象。 它有两个属性:和 。 启用回推后,实现将返回一个子类,该子类将连接包装在 . 与默认值对齐时,缓冲区大小默认为 1。 这允许反序列化程序将“未读”(推回)字节到流中。 下面的简单示例演示如何在委派反序列化程序中使用它,该反序列化程序“窥视”第一个字节以确定要调用的反序列化程序:​ ​TcpNetConnection​ ​​ ​DefaultTcpNetConnectionSupport​ ​​ ​TcpNetConnection​ ​​ ​pushbackCapable​ ​​ ​pushbackBufferSize​ ​​ ​InputStream​ ​​ ​PushbackInputStream​ ​​ ​PushbackInputStream​

public class CompositeDeserializer implements Deserializer<byte[]> {

private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();

private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();

@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
PushbackInputStream pbis = (PushbackInputStream) inputStream;
int first = pbis.read();
if (first < 0) {
throw new SoftEndOfStreamException();
}
pbis.unread(first);
if (first == ByteArrayStxEtxSerializer.STX) {
this.receivedStxEtx = true;
return this.stxEtx.deserialize(pbis);
}
else {
this.receivedCrLf = true;
return this.crlf.deserialize(pbis);
}
}

}
策略界面​ ​TcpNioConnectionSupport​

以下清单显示了策略接口的定义:​ ​TcpNioConnectionSupport​

public interface TcpNioConnectionSupport {

TcpNioConnection createNewConnection(SocketChannel socketChannel,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;

}

调用此接口以创建对象(或子类中的对象)。 Spring 集成提供了两种实现:和 . 使用哪一个取决于是否正在使用 SSL。 一个常见的用例是子类化和覆盖 。 请参阅 SSL 客户端身份验证示例。 与 一样,这些实现也支持回推。​ ​TcpNioConnection​ ​​ ​DefaultTcpNioSSLConnectionSupport​ ​​ ​DefaultTcpNioConnectionSupport​ ​​ ​DefaultTcpNioSSLConnectionSupport​ ​​ ​postProcessSSLEngine​ ​​ ​DefaultTcpNetConnectionSupport​

示例:启用 SSL 客户端身份验证

若要在使用 SSL 时启用客户端证书身份验证,该技术取决于您是否使用 NIO。 如果不这样做,请提供自定义实现以对服务器套接字进行后处理:​ ​TcpSocketSupport​

serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {

@Override
public void postProcessServerSocket(ServerSocket serverSocket) {
((SSLServerSocket) serverSocket).setNeedClientAuth(true);
}

});

(使用 XML 配置时,请通过设置属性来提供对 Bean 的引用)。​ ​socket-support​

使用 NIO 时,请提供自定义实现以对 进行后处理,如以下示例所示:​ ​TcpNioSslConnectionSupport​ ​​ ​SSLEngine​

@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {

@Override
protected void postProcessSSLEngine(SSLEngine sslEngine) {
sslEngine.setNeedClientAuth(true);
}

}
}

@Bean
public TcpNioServerConnectionFactory server() {
...
serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
...
}

(使用 XML 配置时,从 4.3.7 版开始,通过设置属性来提供对 Bean 的引用)。​ ​nio-connection-support​

IP 配置属性

下表描述了可以设置以配置 IP 连接的属性:

表 1.连接工厂属性

属性名称

客户?

服务器?

允许的值

属性说明

​type​

Y

Y

客户端、服务器

确定连接工厂是客户端还是服务器。

​host​

Y

N

目标的主机名或 IP 地址。

​port​

Y

Y

端口。

​serializer​

Y

Y

用于序列化有效负载的实现。 默认为​ ​Serializer​ ​​ ​ByteArrayCrLfSerializer​

​deserializer​

Y

Y

用于反序列化有效负载的实现。 默认为​ ​Deserializer​ ​​ ​ByteArrayCrLfSerializer​

​using-nio​

Y

Y

​true​ ​​,​ ​false​

连接是否使用 NIO。 有关详细信息,请参阅该软件包。 请参阅​ ​关于非阻塞 I/O (NIO)。​ ​​ 违约:。​ ​java.nio​ ​​ ​false​

​using-direct-buffers​

Y

N

​true​ ​​,​ ​false​

使用 NIO 时,连接是否使用直接缓冲区。 有关详细信息,请参阅文档。 如果为 ,则必须为 。​ ​java.nio.ByteBuffer​ ​​ ​false​ ​​ ​using-nio​ ​​ ​false​

​apply-sequence​

Y

Y

​true​ ​​,​ ​false​

使用 NIO 时,可能需要对消息进行重新排序。 当此属性设置为 时,标头将添加到收到的消息中。 请参阅​ ​关于非阻塞 I/O (NIO)。​ ​​ 违约:。​ ​true​ ​​ ​correlationId​ ​​ ​sequenceNumber​ ​​ ​false​

​so-timeout​

Y

Y

默认为 (无穷大),但带有 的服务器连接工厂除外。 在这种情况下,它默认为默认回复超时(10 秒)。​ ​0​ ​​ ​single-use="true"​

​so-send-buffer-size​

Y

Y

看。​ ​java.net.Socket.​ ​​ ​setSendBufferSize()​

​so-receive-buffer-size​

Y

Y

看。​ ​java.net.Socket.​ ​​ ​setReceiveBufferSize()​

​so-keep-alive​

Y

Y

​true​ ​​,​ ​false​

看。​ ​java.net.Socket.setKeepAlive()​

​so-linger​

Y

Y

设置为 使用提供的值。 看。​ ​linger​ ​​ ​true​ ​​ ​java.net.Socket.setSoLinger()​

​so-tcp-no-delay​

Y

Y

​true​ ​​,​ ​false​

看。​ ​java.net.Socket.setTcpNoDelay()​

​so-traffic-class​

Y

Y

看。​ ​java.net.Socket.​ ​​ ​setTrafficClass()​

​local-address​

N

Y

在多宿主系统上,指定套接字绑定到的接口的 IP 地址。

​task-executor​

Y

Y

指定要用于套接字处理的特定执行程序。 如果未提供,则使用内部缓存线程执行程序。 在某些需要使用特定任务执行程序的平台上需要,例如 .​ ​WorkManagerTaskExecutor​

​single-use​

Y

Y

​true​ ​​,​ ​false​

指定一个连接是否可以用于多条消息。 如果为 ,则对每条消息使用一个新连接。​ ​true​

​pool-size​

N

N

不再使用此属性。 为了向后兼容,它会设置积压工作,但应用于在服务器工厂中指定连接积压工作。​ ​backlog​

​backlog​

N

Y

设置服务器工厂的连接积压工作。

​lookup-host​

Y

Y

​true​ ​​,​ ​false​

指定是否对 IP 地址执行反向查找以转换为主机名以在邮件头中使用。 如果为 false,则改用 IP 地址。 违约:。​ ​false​

​interceptor-factory-chain​

Y

Y

请参阅 ​ ​TCP 连接拦截器​ ​。

​ssl-context-support​

Y

Y

看。​ ​SSL/TLS Support​

​socket-factory-support​

Y

Y

看。​ ​SSL/TLS Support​

​socket-support​

Y

Y

请参阅 ​ ​SSL/TLS 支持​ ​。

​nio-connection-support​

Y

Y

请参阅​ ​高级技术​ ​。

​read-delay​

Y

Y

长> 0

由于线程不足而上一次尝试失败后重试读取之前的延迟(以毫秒为单位)。 默认值:100。 仅在 是 时才适用。​ ​using-nio​ ​​ ​true​

下表描述了可以设置以配置 UDP 入站通道适配器的属性:

表 2.UDP 入站通道适配器属性

属性名称

允许的值

属性说明

​port​

适配器侦听的端口。

​multicast​

​true​ ​​,​ ​false​

UDP 适配器是否使用多播。

​multicast-address​

当多播为 true 时,适配器加入的多播地址。

​pool-size​

指定可以同时处理的数据包数。 仅当未配置任务执行程序时,它才适用。 默认值:5。

任务执行器

指定要用于套接字处理的特定执行程序。 如果未提供,则使用内部池执行程序。 在某些需要使用特定任务执行程序(如 . 有关线程要求,请参阅池大小。​ ​WorkManagerTaskExecutor​

​receive-buffer-size​

用于接收 的缓冲区的大小。 通常设置为最大传输单元 (MTU) 大小。 如果使用的缓冲区小于已发送数据包的大小,则可能会发生截断。 您可以使用该属性来检测此问题。​ ​DatagramPackets​ ​​ ​check-length​

​check-length​

​true​ ​​,​ ​false​

UDP 适配器是否需要在收到的数据包中使用数据长度字段。 用于检测数据包截断。

​so-timeout​

有关详细信息,请参阅中的方法。​ ​setSoTimeout()​ ​​ ​java.net.DatagramSocket​

​so-send-buffer-size​

用于 UDP 确认数据包。 有关详细信息,请参阅 setSendBufferSize() 方法。​ ​java.net.DatagramSocket​

​so-receive-buffer-size​

有关详细信息,请参阅。​ ​java.net.DatagramSocket.setReceiveBufferSize()​

​local-address​

在多宿主系统上,指定套接字绑定到的接口的 IP 地址。

​error-channel​

如果下游组件引发异常,则包含异常和失败消息的消息将发送到此通道。​ ​MessagingException​

​lookup-host​

​true​ ​​,​ ​false​

指定是否对 IP 地址执行反向查找以转换为主机名以在邮件头中使用。 如果为 ,则改用 IP 地址。 违约:。​ ​false​ ​​ ​false​

下表描述了可以设置以配置 UDP 出站通道适配器的属性:

表 3.UDP 出站通道适配器属性

属性名称

允许的值

属性说明

​host​

目标的主机名或 IP 地址。 对于组播 udp 适配器,为组播地址。

​port​

目标上的端口。

​multicast​

​true​ ​​,​ ​false​

UDP 适配器是否使用多播。

​acknowledge​

​true​ ​​,​ ​false​

UDP 适配器是否需要来自目标的确认。 启用后,需要设置以下四个属性:、、 和 。​ ​ack-host​ ​​ ​ack-port​ ​​ ​ack-timeout​ ​​ ​min-acks-for- success​

​ack-host​

When is ,指示应向其发送确认的主机或 IP 地址。 通常是当前主机,但可能不同 — 例如,使用网络地址转换 (NAT) 时。​ ​acknowledge​ ​​ ​true​

​ack-port​

当 是 时,指示应将确认发送到的端口。 适配器在此端口上侦听确认。​ ​acknowledge​ ​​ ​true​

​ack-timeout​

When is 表示适配器等待确认的时间(以毫秒为单位)。 如果未及时收到确认,适配器将引发异常。​ ​acknowledge​ ​​ ​true​

​min-acks-for- success​

默认值为 1。 对于多播适配器,您可以将其设置为更大的值,这需要来自多个目标的确认。

​check-length​

​true​ ​​,​ ​false​

UDP 适配器是否在发送到目标的数据包中包含数据长度字段。

​time-to-live​

对于多播适配器,指定 的生存时间属性。 控制多播的范围。 有关更多信息,请参阅 Java API 文档。​ ​MulticastSocket​

​so-timeout​

请参阅 setSoTimeout() 方法了解更多信息。​ ​java.net.DatagramSocket​

​so-send-buffer-size​

有关详细信息,请参阅中的方法。​ ​setSendBufferSize()​ ​​ ​java.net.DatagramSocket​

​so-receive-buffer-size​

用于 UDP 确认数据包。 有关详细信息,请参阅中的方法。​ ​setReceiveBufferSize()​ ​​ ​java.net.DatagramSocket​

本地地址

在多宿主系统上,对于 UDP 适配器,指定套接字绑定到的接口的 IP 地址以接收回复消息。 对于组播适配器,它还确定组播数据包通过哪个接口发送。

​task-executor​

指定要用于确认处理的特定执行程序。 如果未提供,则使用内部单线程执行程序。 在某些需要使用特定任务执行程序的平台上需要,例如 . 一个线程专用于处理确认(如果选项为 true)。​ ​WorkManagerTaskExecutor​ ​​ ​acknowledge​

​destination-expression​

SpEL 表达式

要计算的 SpEL 表达式,以确定将哪个表达式用作传出 UDP 数据包的目标地址。​ ​SocketAddress​

​socket-expression​

SpEL 表达式

要计算的 SpEL 表达式,用于确定哪个数据报套接字用于发送传出 UDP 数据包。

下表描述了可以设置用于配置 TCP 入站通道适配器的属性:

表 4.TCP 入站通道适配器属性

属性名称

允许的值

属性说明

​channel​

入站消息发送到的通道。

​connection-factory​

如果连接工厂的类型为 ,则工厂由此适配器“拥有”。 如果它的类型为 ,则它由出站通道适配器“拥有”,并且此适配器接收出站适配器创建的连接上的任何传入消息。​ ​server​ ​​ ​client​

​error-channel​

如果下游组件引发异常,则包含异常和失败消息的消息将发送到此通道。​ ​MessagingException​

​client-mode​

​true​ ​​,​ ​false​

当 时,入站适配器充当客户端,负责建立连接,然后在该连接上接收传入消息。 违约:。 另请参阅和。 连接工厂的类型必须为 ,并且已设置为 。​ ​true​ ​​ ​false​ ​​ ​retry-interval​ ​​ ​scheduler​ ​​ ​client​ ​​ ​single-use​ ​​ ​false​

​retry-interval​

在 中时,指定在两次连接尝试之间或连接失败后等待的毫秒数。 默认值:60000(60 秒)。​ ​client-mode​

​scheduler​

​true​ ​​,​ ​false​

指定用于管理连接的 。 如果未指定,则默认为全局 Spring Integration Bean,其默认池大小为 10。 请参阅配置任务计划程序​。​ ​TaskScheduler​ ​​ ​client-mode​ ​​ ​taskScheduler​

下表描述了可以设置以配置 TCP 出站通道适配器的属性:

表 5.TCP 出站通道适配器属性

属性名称

允许的值

属性说明

​channel​

出站消息到达的通道。

​connection-factory​

如果连接工厂的类型为 ,则工厂由此适配器“拥有”。 如果它的类型为 ,则它由入站通道适配器“拥有”,并且此适配器尝试将消息与接收原始入站消息的连接相关联。​ ​client​ ​​ ​server​

​client-mode​

​true​ ​​,​ ​false​

当 时,出站适配器会在启动后立即尝试建立连接。 当 时,发送第一条消息时建立连接。 违约:。 另请参阅和。 连接工厂的类型必须为 ,并且已设置为 。​ ​true​ ​​ ​false​ ​​ ​false​ ​​ ​retry-interval​ ​​ ​scheduler​ ​​ ​client​ ​​ ​single-use​ ​​ ​false​

​retry-interval​

在 中时,指定在两次连接尝试之间或连接失败后等待的毫秒数。 默认值:60000(60 秒)。​ ​client-mode​

​scheduler​

​true​ ​​,​ ​false​

指定用于管理连接的 。 如果未指定,则默认为全局 Spring Integration Bean,其默认池大小为 10。 请参阅配置任务计划程序​。​ ​TaskScheduler​ ​​ ​client-mode​ ​​ ​taskScheduler​

下表描述了可以设置用于配置 TCP 入站网关的属性:

表 6.TCP 入站网关属性

属性名称

允许的值

属性说明

​connection-factory​

连接工厂的类型必须是服务器。

​request-channel​

传入消息发送到的通道。

​reply-channel​

回复消息可能到达的通道。 通常,回复到达添加到入站邮件头的临时回复通道。

​reply-timeout​

网关等待答复的时间(以毫秒为单位)。 默认值:1000(1 秒)。

​error-channel​

如果下游组件引发异常,则包含异常和失败消息的消息将发送到此通道。 然后,来自该流的任何回复都将作为网关的响应返回。​ ​MessagingException​

​client-mode​

​true​ ​​,​ ​false​

当 时,入站网关充当客户端,负责建立连接,然后在该连接上接收(和回复)传入消息。 默认值:假。 另请参阅和。 连接工厂的类型必须为 ,并且已设置为 。​ ​true​ ​​ ​retry-interval​ ​​ ​scheduler​ ​​ ​client​ ​​ ​single-use​ ​​ ​false​

​retry-interval​

在 中时,指定在两次连接尝试之间或连接失败后等待的毫秒数。 默认值:60000(60 秒)。​ ​client-mode​

​scheduler​

​true​ ​​,​ ​false​

指定用于管理连接的 。 如果未指定,则默认为全局 Spring Integration Bean,其默认池大小为 10。 请参阅配置任务计划程序​。​ ​TaskScheduler​ ​​ ​client-mode​ ​​ ​taskScheduler​

下表描述了可以设置用于配置 TCP 出站网关的属性:

表 7.TCP 出站网关属性

属性名称

允许的值

属性说明

​connection-factory​

连接工厂的类型必须是 。​ ​client​

​request-channel​

传出邮件到达的通道。

​reply-channel​

自选。 将回复消息发送到的通道。

​remote-timeout​

网关等待远程系统回复的时间(以毫秒为单位)。 与 互斥。 默认值:10000(10 秒)。 注意:在 4.2 之前的版本中,此值默认为 (如果已设置)。​ ​remote-timeout-expression​ ​​ ​reply-timeout​

​remote-timeout-expression​

针对消息计算的 SpEL 表达式,以确定网关等待远程系统回复的时间(以毫秒为单位)。 与 互斥。​ ​remote-timeout​

​request-timeout​

如果未使用一次性连接工厂,则网关等待访问共享连接的时间(以毫秒为单位)。

​reply-timeout​

网关在向回复通道发送回复时等待的时间(以毫秒为单位)。 仅当回复通道可能阻塞(例如当前已满的有界队列通道)时才适用。

​async​

发送后释放发送线程;回复(或错误)将在接收线程上发送。

​unsolicited​ ​​ ​ ​MessageChannel​

用于发送未经请求的消息和延迟回复的渠道。

IP 消息标头

IP 消息标头

此模块使用以下实例:​ ​MessageHeader​

标头名称

IpHeaders Constant

描述

​ip_hostname​

​HOSTNAME​

从中接收 TCP 消息或 UDP 数据包的主机名。 如果是 ,则包含 IP 地址。​ ​lookupHost​ ​​ ​false​

​ip_address​

​IP_ADDRESS​

从中接收 TCP 消息或 UDP 数据包的 IP 地址。

​ip_port​

​PORT​

UDP 数据包的远程端口。

ip_localInetAddress

​IP_LOCAL_ADDRESS​

套接字连接到的本地(从版本 4.2.5 开始)。​ ​InetAddress​

​ip_ackTo​

​ACKADDRESS​

UDP 应用程序级确认发送到的远程 IP 地址。 该框架在数据包中包含确认信息。

​ip_ackId​

​ACK_ID​

UDP 应用程序级确认的相关 ID。 该框架在数据包中包含确认信息。

​ip_tcp_remotePort​

​REMOTE_PORT​

TCP 连接的远程端口。

​ip_connectionId​

​CONNECTION_ID​

TCP 连接的唯一标识符。 由入站消息的框架设置。 发送到服务器端入站通道适配器或回复入站网关时,此标头是必需的,以便终结点可以确定要将消息发送到的连接。

​ip_actualConnectionId​

​ACTUAL_CONNECTION_ID​

仅供参考。 使用缓存或故障转移客户端连接工厂时,它包含实际的基础连接 ID。

​contentType​

​MessageHeaders.​ ​​ ​ ​CONTENT_TYPE​

入站邮件的可选内容类型 在此表后面描述。 请注意,与其他标头常量不同,此常量位于类中,而不是类中。​ ​MessageHeaders​ ​​ ​IpHeaders​

对于入站消息,、、 和 默认映射。 如果将映射器的属性设置为 ,则映射器将设置标头 (,默认情况下)。 您可以通过设置属性来更改默认值。 可以通过子类化和重写方法来添加其他标头。 例如,使用 SSL 时,可以通过从对象获取会话对象来添加 的属性,该对象作为方法的参数提供。​ ​ip_hostname​ ​​ ​ip_address​ ​​ ​ip_tcp_remotePort​ ​​ ​ip_connectionId​ ​​ ​TcpHeaderMapper​ ​​ ​addContentTypeHeader​ ​​ ​true​ ​​ ​contentType​ ​​ ​application/octet-stream;charset="UTF-8"​ ​​ ​contentType​ ​​ ​TcpHeaderMapper​ ​​ ​supplyCustomHeaders​ ​​ ​SSLSession​ ​​ ​TcpConnection​ ​​ ​supplyCustomHeaders​

对于出站消息,有效负载将转换为默认 () 字符集。 设置属性以更改默认值。​ ​String​ ​​ ​byte[]​ ​​ ​UTF-8​ ​​ ​charset​

自定义映射器属性或子类化时,请将映射器声明为 Bean,并使用属性向连接工厂提供实例。​ ​mapper​

基于注释的配置

示例存储库中的以下示例显示了使用注释而不是 XML 时可用的一些配置选项:

@EnableIntegration 
@IntegrationComponentScan
@Configuration
public static class Config {

@Value(${some.port})
private int port;

@MessagingGateway(defaultRequestChannel="toTcp")
public interface Gateway {

String viaTcp(String in);

}

@Bean
@ServiceActivator(inputChannel="toTcp")
public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory);
gate.setOutputChannelName("resultToString");
return gate;
}

@Bean
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannel(fromTcp());
return inGate;
}

@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}

@MessageEndpoint
public static class Echo {

@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
return new String(bytes);
}

@ServiceActivator(inputChannel="toEcho")
public String upCase(String in) {
return in.toUpperCase();
}

@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
return new String(bytes);
}

}

@Bean
public AbstractClientConnectionFactory clientCF() {
return new TcpNetClientConnectionFactory("localhost", this.port);
}

@Bean
public AbstractServerConnectionFactory serverCF() {
return new TcpNetServerConnectionFactory(this.port);
}

}

标准 Spring 集成注释,为集成应用程序启用基础结构。

搜索接口。​ ​@MessagingGateway​

流客户端的入口点。 调用应用程序可用于此 Bean 并调用其方法。​ ​@Autowired​ ​​ ​Gateway​

出站终结点由包装它的使用者和使用者组成。 在此方案中,根据通道类型配置终结点。​ ​MessageHandler​ ​​ ​@ServiceActivator​

入站终结点(在 TCP/UDP 模块中)都是消息驱动的,因此只需要声明为简单实例。​ ​@Bean​

此类提供了许多用于此示例流的 POJO 方法(服务器端的 a 和以及客户端的 a)。​ ​@Transformer​ ​​ ​@ServiceActivator​ ​​ ​@Transformer​

客户端连接工厂。

服务器端连接工厂。

将 Java DSL 用于 TCP 组件

DSL 对 TCP 组件的支持包括适配器和网关的规范、具有用于创建连接工厂 Bean 的工厂方法的类,以及具有用于创建序列化程序和解序列化程序的工厂方法的类。 有关更多信息,请参阅他们的 javadocs。​ ​Tcp​ ​​ ​TcpCodecs​

下面是使用 DSL 通过 DSL 配置流的一些示例。

例 1.服务器适配器流

@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}

例 2.客户端适配器流

@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.nioClient("localhost", 1234)
.serializer(TcpCodecs.lengthHeader1())));
}

例 3.服务器网关流

@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundGateway(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}

例 4.客户端网关流程

@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundGateway(Tcp.nioClient("localhost", 1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())));
}