import
java.security.KeyStore;
import
java.security.PublicKey;
import
java.security.cert.CertificateFactory;
import
java.security.cert.X509Certificate;
import
java.util.ArrayList;
import
java.util.LinkedList;
import
java.util.List;
import
java.util.function.Consumer;
import
javax.net.ssl.KeyManagerFactory;
import
javax.net.ssl.SNIMatcher;
import
javax.net.ssl.SNIServerName;
import
javax.net.ssl.SSLEngine;
import
javax.net.ssl.SSLParameters;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
io.netty.handler.ssl.SslContext;
import
io.netty.handler.ssl.SslContextBuilder;
import
reactor.netty.tcp.SslProvider.SslContextSpec;
public
class
SslConsumer
implements
Consumer<SslContextSpec>
{
private
Logger logger = LoggerFactory.getLogger(SslConsumer.
class
);
private
String keyStorePath;
private
String password = "123456"
;
private
boolean
trustServer =
true
;
//
测试,默认为true
private
String serverCaPath = ""
;
public
SslConsumer(String keyStorePath) {
this
.keyStorePath =
keyStorePath;
@Override
public
void
accept(SslContextSpec t) {
try
{
t.sslContext(createSslContext(loadKeyStore(keyStorePath))).handlerConfigurator(handler
->
{
SSLEngine engine
=
handler.engine();
List
<SNIMatcher> matchers =
new
LinkedList<SNIMatcher>
();
SNIMatcher matcher
=
new
SNIMatcher(0
) {
@Override
public
boolean
matches(SNIServerName serverName) {
//
返回true,不验证主机名
return
true
;
matchers.add(matcher);
SSLParameters params
=
new
SSLParameters();
params.setSNIMatchers(matchers);
engine.setSSLParameters(params);
}
catch
(Exception e) {
private
SslContext createSslContext(KeyStore keyStore)
throws
Exception {
SslContextBuilder builder
=
SslContextBuilder.forClient();
KeyManagerFactory keyMgrFactory
= KeyManagerFactory.getInstance("SunX509"
);
keyMgrFactory.init(keyStore, password.toCharArray());
builder.keyManager(keyMgrFactory);
SSLX509TrustMgr trustMgr
=
null
;
if
(trustServer) {
trustMgr
=
new
SSLX509TrustMgr();
}
else
{
trustMgr
=
new
SSLX509TrustMgr(getSeverPublicKey(serverCaPath));
builder.trustManager(trustMgr);
List
<String> ciper =
new
ArrayList<String>
();
ciper.add(
"TLS_RSA_WITH_AES_128_GCM_SHA256"
);
ciper.add(
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"
);
ciper.add(
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
);
builder.ciphers(ciper);
return
builder.build();
private
KeyStore loadKeyStore(String keyStorePath) {
KeyStore keyStore
=
null
;
InputStream in
=
null
;
try
{
in
=
new
FileInputStream(keyStorePath);
keyStore
=
KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(in, password.toCharArray());
}
catch
(Exception e) {
logger.error(
""
, e);
return
keyStore;
private
PublicKey getSeverPublicKey(String serverCaPath) {
PublicKey key
=
null
;
InputStream in
=
null
;
try
{
in
=
new
FileInputStream(serverCaPath);
X509Certificate cert
= (X509Certificate) CertificateFactory.getInstance("X.509"
).generateCertificate(in);
key
=
cert.getPublicKey();
}
catch
(Exception e) {
logger.error(
""
, e);
return
key;
验证服务器整数实现类
目前没有做任何处理,有需要验证服务器证书时,可以在对应的重写方法中进行处理
package com.demo.client;
import java.security.PublicKey;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.X509TrustManager;
public class SSLX509TrustMgr implements X509TrustManager {
private PublicKey serverPublicKey;
public SSLX509TrustMgr() {
public SSLX509TrustMgr(PublicKey serverPublicKey) {
this.serverPublicKey = serverPublicKey;
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
// TODO Auto-generated method stub
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
// TODO Auto-generated method stub
@Override
public X509Certificate[] getAcceptedIssuers() {
// TODO Auto-generated method stub
return null;
WebClient工具类
package com.demo.utils;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import com.demo.entity.SslConsumer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.retry.Backoff;
import reactor.retry.Retry;
public class WebClientUtil {
private static Logger logger = LoggerFactory.getLogger(WebClientUtil.class);
private static Map<String, WebClient> webClientMap = new ConcurrentHashMap<String, WebClient>();
public static WebClient createWebClient(String keyStorePath) {
ConnectionProvider provider = ConnectionProvider.builder("wc-").maxConnections(30)
.maxIdleTime(Duration.ofSeconds(30)).maxLifeTime(Duration.ofSeconds(30))
.pendingAcquireTimeout(Duration.ofSeconds(30)).build();
LoopResources loop = LoopResources.create("loop-", 20, 20, true);
TcpClient tcpClient = TcpClient.create(provider).secure(new SslConsumer(keyStorePath)).runOn(loop)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000).doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(30)).addHandlerLast(new WriteTimeoutHandler(20)));
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();
// 如下方式可能会一直循环生成loop线程,直到程序僵死
// HttpClient httpClient = HttpClient.create(provider).secure(new SslConsumer(keyStorePath))
// .tcpConfiguration(client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// .doOnConnected(con -> con.addHandlerLast(new ReadTimeoutHandler(20))
// .addHandlerLast(new WriteTimeoutHandler(10)))
// .runOn(LoopResources.create("loop-", 20, 20, true)));
// return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
public static Mono<String> doPost(String url, String data, String keyStorePath) {
WebClient webClient = webClientMap.get(url);
if (webClient == null) {
webClient = createWebClient(keyStorePath);
WebClient putIfAbsent = webClientMap.putIfAbsent(url, webClient);
if (putIfAbsent != null) {
webClient = putIfAbsent;
Retry<?> retry = Retry.anyOf(ConnectTimeoutException.class, NoRouteToHostException.class).retryMax(3)
.backoff(Backoff.fixed(Duration.ofMillis(100)));
Mono<String> mono = webClient.post().uri(url).contentType(MediaType.APPLICATION_JSON).bodyValue(data).retrieve()
.bodyToMono(String.class).timeout(Duration.ofSeconds(30)).doOnError(ConnectException.class, e -> {
logger.error("", e);
}).doOnError(NoRouteToHostException.class, e -> {
logger.error("", e);
}).retryWhen(retry);
return mono;
1、demo使用的springboot版本为2.3.3
2、HttpClient的create方法与secure方法不能分开,否则secure方法可能不生效
3、 ConnectionProvider中有很多方法已经过时,比如fixed,elastic,不建议使用,在过时的方法上,源码中也给出了对应替换使用的方法,比如fixed可以使用create方法替代,elastic方法可以用builder方法替代,但create方法直接创建的就为ConnectionProvider,重载的方法可以设置maxConnections,但不能设置连接最大空闲时间,连接最大生命周期等,builder方法可以对线程更精细的管理,故本例使用的builder方法和build来创建ConnectionProvider,如果使用默认的provider,线程数默认500,且maxIdleTime和maxLifeTime为-1
4、LoopResources为处理响应消息的线程数,2.3.3版本最小值为4
5、ReactorClientHttpConnector存在一个重构方法
在该方法中,factory可以设置ConnectionProvider和LoopResources,mapper可以设置HttpClient,但在使用时可能是使用方式不正确,一直都是要么线程池配置上了,但SSL不可用,或者是SSL可用,但线程池不可用,后在ReactorClientHttpConnector两个参数的构造方法中看到initHttpClient方法,该方法中,使用了runOn配置LoopResources,故本例中也是用了runOn来配置。
@Override
public void run(String... args) throws Exception {
int count = 6;
for (int i = 0; i < count; i++) {
logger.info("start: {}", i);
JsonObject obj = new JsonObject();
obj.addProperty("name", "name"+i);
obj.addProperty("age", i);
Mono<String> mono = WebClientUtil.doPost(url, obj.toString(), keyStorePath);
mono.subscribe(new MainConsumer(i));
logger.info("end: {}", i);
class MainConsumer implements Consumer<String> {
private int i;
public MainConsumer(int i) {
this.i = i;
@Override
public void accept(String t) {
logger.info("receive: {}, loop: {}", t, i);
View Code
[INFO] main 14:44:30 WebclientdemoApplication:61(logStarted) Started WebclientdemoApplication in 3.453 seconds (JVM running for 4.137)
[INFO] main 14:44:30 MainBusiStart:29(run) start: 0
[INFO] main 14:44:31 MainBusiStart:35(run) end: 0
[INFO] main 14:44:31 MainBusiStart:29(run) start: 1
[INFO] main 14:44:31 MainBusiStart:35(run) end: 1
[INFO] main 14:44:31 MainBusiStart:29(run) start: 2
[INFO] main 14:44:31 MainBusiStart:35(run) end: 2
[INFO] main 14:44:31 MainBusiStart:29(run) start: 3
[INFO] main 14:44:31 MainBusiStart:35(run) end: 3
[INFO] main 14:44:31 MainBusiStart:29(run) start: 4
[INFO] main 14:44:31 MainBusiStart:35(run) end: 4
[INFO] main 14:44:31 MainBusiStart:29(run) start: 5
[INFO] main 14:44:31 MainBusiStart:35(run) end: 5
[INFO] loop--nio-4 14:44:31 MainBusiStart:48(accept) receive: {"name":"name3","age":3.0}, loop: 3
[INFO] loop--nio-5 14:44:31 MainBusiStart:48(accept) receive: {"name":"name4","age":4.0}, loop: 4
[INFO] loop--nio-1 14:44:31 MainBusiStart:48(accept) receive: {"name":"name0","age":0.0}, loop: 0
[INFO] loop--nio-2 14:44:31 MainBusiStart:48(accept) receive: {"name":"name1","age":1.0}, loop: 1
[INFO] loop--nio-3 14:44:31 MainBusiStart:48(accept) receive: {"name":"name2","age":2.0}, loop: 2
[INFO] loop--nio-6 14:44:31 MainBusiStart:48(accept) receive: {"name":"name5","age":5.0}, loop: 5
View Code
6、本例中,使用了一个reactor-extra进行重试处理
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
</dependency>
但这种重试的方法虽然比较简单好用,但已经被标记为过时
应该使用传递一个reactor.util.retry.Retry类型参数的方法
7、如果使用springclloud,则需要考虑springboot和springcloud的兼容问题,以及当前程序的springboot和springcloud版本与其他需要远程调用的版本的兼容