Reactor Netty 提供了易于使用和配置的 HttpServer 。它隐藏了创建 HTTP 服务器所需的大部分 Netty 的功能,并增加了 Reactive Streams 背压。

5.1.启动和停止

要想启动一个HTTP服务器,您必须创建并且配置一个 HttpServer 实例。默认情况下, host 被配置为任何的本地地址,当执行 bind 操作的时候系统会选择一个临时端口。下面是创建并且配置一个 HttpServer 实例的例子:

github.com/reactor/rea…

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()   //<1>
				          .bindNow(); //<2>
		server.onDispose()
		      .block();

<1> 创建一个HttpServer实例用来进行之后的配置操作。

<2> 使用阻塞等待的方式启动服务器,直到初始化完成。

返回的DisposableServer提供了简单的服务器API,包括disposeNow(),这个方法可以以阻塞等待的方式来关闭服务器。

5.1.1.Host和Port

想要设置特定hostport,您可以用下面的方式来配置HTTP服务器:

github.com/reactor/rea…

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
public class Application {
	public static void main(String[] args) {
		DisposableServer server =
				HttpServer.create()
				          .host("localhost") //<1>
				          .port(8080)        //<2>
				          .bindNow();
		server.onDispose()
		      .block();

<1> 配置HTTP服务器的host

<2> 配置HTTP服务器的port

5.2.预先初始化

默认情况下,HttpServer初始化资源的操作在需要使用的时候才进行。这意味着初始化加载的时候bind operation会占用额外的时间:

  • 事件循环组
  • native传输库(当使用了native传输的时候)
  • 用于安全性的native库(使用了OpenSsl的时候)
  • 当您需要预加载这些资源的时候,您可以按照以下方式来配置HttpServer

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		HttpServer httpServer =
    				HttpServer.create()
    				          .handle((request, response) -> request.receive().then());
    		httpServer.warmup() //<1>
    		          .block();
    		DisposableServer server = httpServer.bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 初始化和加载事件循环组,native传输库和用于安全性的native库

    5.3.HTTP路由

    想要给HTTP服务器定义路由需要配置提供的HttpServerRoutes builder。示例如下:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .route(routes ->
    				              routes.get("/hello",        //<1>
    				                        (request, response) -> response.sendString(Mono.just("Hello World!")))
    				                    .post("/echo",        //<2>
    				                        (request, response) -> response.send(request.receive().retain()))
    				                    .get("/path/{param}", //<3>
    				                        (request, response) -> response.sendString(Mono.just(request.param("param"))))
    				                    .ws("/ws",            //<4>
    				                        (wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain())))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 提供一个为/helloGET路由,返回Hello World!字符串

    <2> 提供一个为/echoPOST路由,将接收到的请求的body数据返回回去。

    <3> 提供一个为/path/{param}GET路由,返回路径参数的值。

    <4> 提供一个websocket的/ws路由,将接收到的数据返回回去。

    服务器路由是唯一的,并且按照声明的顺序第一个匹配上的才会被调用。

    5.3.1.SSE

    下面的代码展示了如何配置HTTP服务器来提供Server-Sent Events服务:

    github.com/reactor/rea…

    import com.fasterxml.jackson.databind.ObjectMapper;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import org.reactivestreams.Publisher;
    import reactor.core.publisher.Flux;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.http.server.HttpServerRequest;
    import reactor.netty.http.server.HttpServerResponse;
    import java.io.ByteArrayOutputStream;
    import java.nio.charset.Charset;
    import java.time.Duration;
    import java.util.function.BiFunction;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .route(routes -> routes.get("/sse", serveSse()))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    	 * Prepares SSE response
    	 * The "Content-Type" is "text/event-stream"
    	 * The flushing strategy is "flush after every element" emitted by the provided Publisher
    	private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {
    		Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));
    		return (request, response) ->
    		        response.sse()
    		                .send(flux.map(Application::toByteBuf), b -> true);
    	 * Transforms the Object to ByteBuf following the expected SSE format.
    	private static ByteBuf toByteBuf(Object any) {
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		try {
    			out.write("data: ".getBytes(Charset.defaultCharset()));
    			MAPPER.writeValue(out, any);
    			out.write("\n\n".getBytes(Charset.defaultCharset()));
    		catch (Exception e) {
    			throw new RuntimeException(e);
    		return ByteBufAllocator.DEFAULT
    		                       .buffer()
    		                       .writeBytes(out.toByteArray());
    	private static final ObjectMapper MAPPER = new ObjectMapper();
    

    5.3.2.静态资源

    下面的代码展示了怎么配置HTTP服务器来提供静态资源:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import java.net.URISyntaxException;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    public class Application {
    	public static void main(String[] args) throws URISyntaxException {
    		Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
    		DisposableServer server =
    				HttpServer.create()
    				          .route(routes -> routes.file("/index.html", file))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    5.4.写出数据

    如果要发送数据到一个已连接的客户端,您必须使用handler(…)或者route(…)来添加一个I/O处理器。这个I/O处理器可以通过HttpServerResponse来写出数据。下面是使用handle(…)方法的例子:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .handle((request, response) -> response.sendString(Mono.just("hello"))) //<1>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 给已连接的客户端发送hello字符串

    5.4.1.添加Headers或其他元数据

    当您给已连接的客户端发送数据的时候,您可能想发送额外的headers,cookies,status code和其他的元数据。您可以使用HttpServerResponse来添加这些额外的元数据。示例如下:

    github.com/reactor/rea…

    import io.netty.handler.codec.http.HttpHeaderNames;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .route(routes ->
    				              routes.get("/hello",
    				                  (request, response) ->
    				                      response.status(HttpResponseStatus.OK)
    				                              .header(HttpHeaderNames.CONTENT_LENGTH, "12")
    				                              .sendString(Mono.just("Hello World!"))))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    5.4.2.压缩

    您可以通过配置HTTP服务器,根据请求头Accept-Encoding来返回一个压缩的响应体。

    Reactor Netty提供了三种不同的策略来压缩传出的数据:

  • compress(boolean):根据所提供的布尔值,true为启用压缩,false为禁用压缩。
  • compress(int):当响应体大小超过了给定的值就会将响应体进行压缩(单位:字节)。
  • compress(BiPredicate<HttpServerRequest, HttpServerResponse>):如果predicate返回true则压缩。
  • 下面的是使用compress方法(设置为true)来启用压缩的例子:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import java.net.URISyntaxException;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    public class Application {
    	public static void main(String[] args) throws URISyntaxException {
    		Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());
    		DisposableServer server =
    				HttpServer.create()
    				          .compress(true)
    				          .route(routes -> routes.file("/index.html", file))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    5.5.消费数据

    如果要接收从连接的客户端发过来的数据,您必须使用handler(…)或者route(…)来添加一个I/O处理器。这个I/O处理器可以通过HttpServerRequest来读数据。

    下面是使用handle(…)方法的例子:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .handle((request, response) -> request.receive().then()) //<1>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 接收从已连接的客户端发过来的数据

    5.5.1.读取Headers,URI参数和其他元数据

    当您从已连接的客户端接收数据的时候,您可能需要检查请求头,参数和其他的元数据。您可以通过HttpServerRequest来获取这些元数据。示例如下:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .route(routes ->
    				              routes.get("/{param}",
    				                  (request, response) -> {
    				                      if (request.requestHeaders().contains("Some-Header")) {
    				                          return response.sendString(Mono.just(request.param("param")));
    				                      return response.sendNotFound();
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    获取远程(客户端)地址

    您可以从请求数据中获取的额外的元数据,您也可以拿到host(server)的地址,remote(client)的地址和schema。根据选择的工厂方法,您可以直接从channel拿到这些信息或者从Forwarded或者X-Forwarded-* HTTP请求头中拿到这些信息。示例如下:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .forwarded(true) //<1>
    				          .route(routes ->
    				              routes.get("/clientip",
    				                  (request, response) ->
    				                      response.sendString(Mono.just(request.remoteAddress() //<2>
    				                                                           .getHostString()))))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 如果可能的话,从Forwarded和X-Forwarded-* HTTP请求头中获取连接的相关信息。

    <2> 从远(客户端)端返回地址。

    也可以自定义ForwardedX-Forwarded-*头处理器。下面的例子展示怎么做:

    github.com/reactor/rea…

    import java.net.InetSocketAddress;
    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.transport.AddressUtils;
    public class CustomForwardedHeaderHandlerApplication {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .forwarded((connectionInfo, request) -> {  //<1>
    				              String hostHeader = request.headers().get("X-Forwarded-Host");
    				              if (hostHeader != null) {
    				                  String[] hosts = hostHeader.split(",", 2);
    				                  InetSocketAddress hostAddress = AddressUtils.createUnresolved(
    				                      hosts[hosts.length - 1].trim(),
    				                      connectionInfo.getHostAddress().getPort());
    				                  connectionInfo = connectionInfo.withHostAddress(hostAddress);
    				              return connectionInfo;
    				          .route(routes ->
    				              routes.get("/clientip",
    				                  (request, response) ->
    				                      response.sendString(Mono.just(request.remoteAddress() //<2>
    				                                                           .getHostString()))))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 添加一个自定义头处理器。

    <2> 从远(客户端)端返回地址。

    5.5.2.HTTP请求解码器

    默认情况下,Netty对收到的请求配置了一些限制条件,例如:

    初始行的最大长度。

    所有头的最大长度。

    content或每个chunk的最大长度。

    更多的信息请查看HttpRequestDecoderHttpServerUpgradeHandler

    默认情况下,HTTP服务器配置如下:

    ./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpDecoderSpec.java

    public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
    public static final int DEFAULT_MAX_HEADER_SIZE         = 8192;
    public static final int DEFAULT_MAX_CHUNK_SIZE          = 8192;
    public static final boolean DEFAULT_VALIDATE_HEADERS    = true;
    public static final int DEFAULT_INITIAL_BUFFER_SIZE     = 128;
    

    ./../../reactor-netty-http/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java

    * The maximum length of the content of the HTTP/2.0 clear-text upgrade request. * By default the server will reject an upgrade request with non-empty content, * because the upgrade request is most likely a GET request. public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 0;

    当您需要改变这些配置的时候,您可以通过如下方式配置HTTP服务器:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .httpRequestDecoder(spec -> spec.maxHeaderSize(16384)) //<1>
    				          .handle((request, response) -> response.sendString(Mono.just("hello")))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 全部头的最大长度为16384。当超过这个值的时候会引发TooLongFrameException

    5.6.生命周期回调

    下面的生命周期回调用参数是提供给您用来扩展HttpServer的:

    CallbackDescription
    doOnBind当服务器channel即将被绑定的时候调用。
    doOnBound当服务器channel已经被绑定的时候调用。
    doOnChannelInit当channel初始化的时候被调用。
    doOnConnection当一个远程客户端连接上的时候被调用。
    doOnUnbound当服务器channel解绑的时候被调用。

    下面是使用doOnConnectiondoOnChannelInit回调的例子:

    github.com/reactor/rea…

    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import java.util.concurrent.TimeUnit;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .doOnConnection(conn ->
    				              conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) //<1>
    				          .doOnChannelInit((observer, channel, remoteAddress) ->
    				              channel.pipeline()
    				                     .addFirst(new LoggingHandler("reactor.netty.examples"))) //<2>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 当一个远程客户端连接的时候添加了一个ReadTimeoutHandlerNetty pipeline。

    <2> 当初始化channel的时候添加了一个LoggingHandlerNetty pipeline。

    5.7.TCP层的配置

    当您需要修改TCP层的配置的时候,您可以使用以下代码段来扩展默认的TCP服务器配置:

    github.com/reactor/rea…

    import io.netty.channel.ChannelOption;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    关于TCP层配置的更多详情请查看TCP Server

    5.7.1.Wire Logger

    Reactor Netty提供了线路记录(wire logging)用来检查点对点的流量。默认情况下,线路记录是关闭的。如果想要开启它,您必须将日志reactor.netty.http.server.HttpServer的设置为DEBUG等级并且按如下方式进行配置:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .wiretap(true) //<1>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 开启线路记录

    默认情况下,线路记录在输出内容的时候会使用AdvancedByteBufFormat#HEX_DUMP。您也可以通过配置HttpServer改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL

    github.com/reactor/rea…

    import io.netty.handler.logging.LogLevel;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.transport.logging.AdvancedByteBufFormat;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 开启线路记录并使用AdvancedByteBufFormat#TEXTUAL来输出内容。

    5.8.SSL和TLS

    当您需要使用SSL或者TLS的时候,可以使用下面列出来方式进行配置。默认情况下,如果OpenSSL可用的话,则使用SslProvider.OPENSSL。否则使用SslProvider.JDK。可以通过SslContextBuilder或者设置-Dio.netty.handler.ssl.noOpenSsl=true来进行切换。

    下面的是使用SslContextBuilder的例子:

    github.com/reactor/rea…

    import io.netty.handler.ssl.SslContextBuilder;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import java.io.File;
    public class Application {
    	public static void main(String[] args) {
    		File cert = new File("certificate.crt");
    		File key = new File("private.key");
    		SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
    		DisposableServer server =
    				HttpServer.create()
    				          .secure(spec -> spec.sslContext(sslContextBuilder))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    5.8.1.服务器名称标识

    您可以配置HTTP服务器的多个SslContext映射到一个特定的域。配置SNI映射时,可以使用确切的域名或包含通配符的域名。

    下面是使用包含通配符的域名的例子:

    github.com/reactor/rea…

    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import java.io.File;
    public class Application {
    	public static void main(String[] args) throws Exception {
    		File defaultCert = new File("default_certificate.crt");
    		File defaultKey = new File("default_private.key");
    		File testDomainCert = new File("default_certificate.crt");
    		File testDomainKey = new File("default_private.key");
    		SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
    		SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();
    		DisposableServer server =
    				HttpServer.create()
    				          .secure(spec -> spec.sslContext(defaultSslContext)
    				                              .addSniMapping("*.test.com",
    				                                      testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    5.9.HTTP访问日志

    您可以以编程的方式或者配置的方式来启用HTTP访问日志。默认情况下,是禁用的

    您可以通过设置-Dreactor.netty.http.server.accessLogEnabled=true配置来启用HTTP访问日志。

    您可以使用下面的配置(用于Logback或类似的日志框架)来将HTTP访问日志单独分出来:

    <appender name="accessLog" class="ch.qos.logback.core.FileAppender">
        <file>access_log.log</file>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>
    <appender name="async" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="accessLog" />
    </appender>
    <logger name="reactor.netty.http.server.AccessLog" level="INFO" additivity="false">
        <appender-ref ref="async"/>
    </logger>
    

    下面的例子展示了以编程的方式开启HTTP访问日志:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .accessLog(true)
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    这个方法的优先级高于配置系统参数。

    默认情况下,日志格式是Common Log Format,但是您可以通过参数来自定义格式,示例如下:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.http.server.logging.AccessLog;
    public class CustomLogAccessFormatApplication {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .accessLog(true, x -> AccessLog.create("method={}, uri={}", x.method(), x.uri()))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    您也可以使用AccessLogFactory#createFilter方法来过滤HTTP访问日志,示例如下:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.http.server.logging.AccessLogFactory;
    public class FilterLogAccessApplication {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/")))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    请注意,这个方法也可以自定义格式,例如:

    github.com/reactor/rea…

    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    import reactor.netty.http.server.logging.AccessLog;
    import reactor.netty.http.server.logging.AccessLogFactory;
    public class CustomFormatAndFilterAccessLogApplication {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/"), //<1>
    						          x -> AccessLog.create("method={}, uri={}", x.method(), x.uri()))) //<2>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 指定要使用的过滤predicate

    <2> 指定自定义日志格式

    5.10.HTTP/2

    默认情况下,HTTP服务器支持HTTP/1.1。如果您需要HTTP/2,您可以通过配置来实现。除了协议配置外,如果您需要H2而不是H2C明文)的话,还必须配置SSL。

    由于JDK8不支持 "开箱即用 "的应用层协议协商(ALPN)(尽管一些厂商将ALPN移植到JDK8),您需要添加额外的本地库依赖来支持它,例如:netty-tcnative-boringssl-static

    下面列出了一个简单的H2的例子:

    github.com/reactor/rea…

    import io.netty.handler.ssl.SslContextBuilder;
    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.HttpProtocol;
    import reactor.netty.http.server.HttpServer;
    import java.io.File;
    public class H2Application {
    	public static void main(String[] args) {
    		File cert = new File("certificate.crt");
    		File key = new File("private.key");
    		SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
    		DisposableServer server =
    				HttpServer.create()
    				          .port(8080)
    				          .protocol(HttpProtocol.H2)                          //<1>
    				          .secure(spec -> spec.sslContext(sslContextBuilder)) //<2>
    				          .handle((request, response) -> response.sendString(Mono.just("hello")))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 配置服务器仅支持HTTP/2

    <2> 配置SSL

    现在该应用的表现应该像如下这样:

    $ curl --http2 https://localhost:8080 -i
    HTTP/2 200
    hello
    

    下面列出了一个简单的H2C的例子:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.HttpProtocol;
    import reactor.netty.http.server.HttpServer;
    public class H2CApplication {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .port(8080)
    				          .protocol(HttpProtocol.H2C)
    				          .handle((request, response) -> response.sendString(Mono.just("hello")))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    现在该应用的表现应该像如下这样:

    $ curl --http2-prior-knowledge http://localhost:8080 -i
    HTTP/2 200
    hello
    

    5.10.1.选择协议

    ./../../reactor-netty-http/src/main/java/reactor/netty/http/HttpProtocol.java

    public enum HttpProtocol {
    	 * The default supported HTTP protocol by HttpServer and HttpClient
    	HTTP11,
    	 * HTTP/2.0 support with TLS
    	 * <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.
    	 * While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.
    	 * <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol
    	 * for communication with no fallback to HTTP/1.1.
    	 * HTTP/2.0 support with clear-text.
    	 * <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":
    	 * Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers
    	 * and {@literal Connection: Upgrade}. A server will typically reply a successful
    	 * 101 status if upgrade is successful or a fallback HTTP/1.1 response. When
    	 * successful the client will start sending HTTP/2.0 traffic.
    	 * <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't
    	 * require {@literal Connection: Upgrade} handshake between a client and server but
    	 * fallback to HTTP/1.1 will not be supported.
    

    5.11.度量

    HTTP服务器支持与Micrometer的内置集成。它暴露了所有前缀为reactor.netty.http.server的度量。

    下面的表格提供了HTTP服务器度量的相关信息:

    度量名称类型描述
    reactor.netty.http.server.data.receivedDistributionSummary收到的数据量,以字节为单位
    reactor.netty.http.server.data.sentDistributionSummary发送的数据量,以字节为单位
    reactor.netty.http.server.errorsCounter发生的错误数量
    reactor.netty.http.server.data.received.timeTimer消费传入的数据所花费的时间
    reactor.netty.http.server.data.sent.timeTimer传出数据所花费的时间
    reactor.netty.http.server.response.timeTimer请求/响应的总时间

    下面额外的度量也是可用的:

    ByteBufAllocator度量

    度量名称类型描述
    reactor.netty.bytebuf.allocator.used.heap.memoryGauge堆内存的字节数
    reactor.netty.bytebuf.allocator.used.direct.memoryGauge堆外内存的字节数
    reactor.netty.bytebuf.allocator.used.heap.arenasGauge堆内存的个数(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.direct.arenasGauge堆外内存的个数(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.threadlocal.cachesGaugethreadlocal的缓存数量(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.tiny.cache.sizeGauge微小缓存的大小(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.small.cache.sizeGauge小缓存的大小(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.normal.cache.sizeGauge一般缓存的大小(当使用PooledByteBufAllocator的时候)
    reactor.netty.bytebuf.allocator.used.chunk.sizeGauge一个区域的块大小(当使用PooledByteBufAllocator的时候)

    下面是开启集成的度量的例子:

    github.com/reactor/rea…

    import io.micrometer.core.instrument.Metrics;
    import io.micrometer.core.instrument.config.MeterFilter;
    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		Metrics.globalRegistry //<1>
    		       .config()
    		       .meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.server", "URI", 100, MeterFilter.deny()));
    		DisposableServer server =
    				HttpServer.create()
    				          .metrics(true, s -> {
    				              if (s.startsWith("/stream/")) { //<2>
    				                  return "/stream/{n}";
    				              else if (s.startsWith("/bytes/")) {
    				                  return "/bytes/{n}";
    				              return s;
    				          }) //<3>
    				          .route(r ->
    				              r.get("/stream/{n}",
    				                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
    				               .get("/bytes/{n}",
    				                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 设置带有URI标签的仪表的上限

    <2> 在可能的情况下,模板化的URI将被用作URI标签的值。

    <3> 启用内置集成的Micrometer

    为了避免启用度量而造成的内存和CPU的开销,如果可以的话,将真实的URI转换为模板化的URI是很重要的。如果不转换为类似于模板的形式,则会导致每一个不同的URI都会创建一个不同的标签,这样度量会占用大量的内存。

    始终对带有URI标签的仪表设置一个上限。给仪表配置一个上线以防真实的URI不能被模板化。您可以在maximumAllowableTags找更多的信息。

    如果您想让HTTP服务端度量与除了Micrometer之外的系统集成或者想提供自己与Micrometer的集成来添加自己的度量记录器,您可以按如下方式实现:

    github.com/reactor/rea…

    import reactor.core.publisher.Mono;
    import reactor.netty.DisposableServer;
    import reactor.netty.channel.ChannelMetricsRecorder;
    import reactor.netty.http.server.HttpServer;
    import java.net.SocketAddress;
    import java.time.Duration;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .metrics(true, CustomHttpServerMetricsRecorder::new) //<1>
    				          .route(r ->
    				              r.get("/stream/{n}",
    				                   (req, res) -> res.sendString(Mono.just(req.param("n"))))
    				               .get("/bytes/{n}",
    				                   (req, res) -> res.sendString(Mono.just(req.param("n")))))
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 开启HTTP服务端度量并且提供HttpServerMetricsRecorder的实现。

    5.12.Unix域套接字

    当使用本地传输时,HTTP服务器支持Unix域套接字(UDS)。

    下面是使用UDS的例子:

    github.com/reactor/rea…

    import io.netty.channel.unix.DomainSocketAddress;
    import reactor.netty.DisposableServer;
    import reactor.netty.http.server.HttpServer;
    public class Application {
    	public static void main(String[] args) {
    		DisposableServer server =
    				HttpServer.create()
    				          .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) //<1>
    				          .bindNow();
    		server.onDispose()
    		      .block();
    

    <1> 指定将使用的DomainSocketAddress

    Suggest Edit to "HTTP Server"

    Reactor Netty参考指南目录