static final int DEFAULT_BUFFER_SIZE = 256 ;
由于消费的比较慢(我们手动 Sleep 了 2 秒) this.subscription.request(1)
只能一条一条消费,所以效果就是一条一条消费,消费一条,生产一条。
最好自己在本地跑一下代码,感受一下 backpressure
R2DBC
Reactive 本来不支持 JDBC。最根本的原因是,JDBC 不是 non-blocking 设计。不过这个事情也正在推进和发展过程中,比如 r2dbc ( r2dbc.io/ ) 项目。
R2DBC 是 Reactive Relational Database Connectivity 的首字母缩写词。 R2DBC 是一个 API 规范倡议,它声明了一个响应式 API,由驱动程序供应商实现,并以响应式编程的方式访问他们的关系数据库。
R2DBC
基于Reactive Streams
反应流规范,它是一个开放的规范,为驱动程序供应商和使用方提供接口(r2dbc-spi
),与JDBC
的阻塞特性不同,它提供了完全反应式的非阻塞API
与 [关系型数据库] 交互。
目前 R2DBC 支持的数据库如下:
cloud-spanner-r2dbc - driver for Google Cloud Spanner.
jasync-sql - R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL (written in Kotlin).
oracle-r2dbc - native driver implemented for Oracle.
r2dbc-h2 - native driver implemented for H2 as a test database.
r2dbc-mariadb - native driver implemented for MariaDB.
r2dbc-mssql - native driver implemented for Microsoft SQL Server.
r2dbc-mysql - native driver implemented for MySQL.
r2dbc-postgresql - native driver implemented for PostgreSQL.
很多同学可能会关心事务的事儿,R2DBC 也是支持事务的
什么是 Spring-WebFlux
相信有了前文对 Reactive 的铺垫,了解 Spring-WebFlux 会比较容易了。
Spring 5.0 添加了 Spring-WebFlux 模块将默认的 web 服务器改为 Netty,支持 Reactive 应用,它的特点是:
完全非阻塞 式的(non-blocking)
支持 Reactive Stream 背压(Reactive Streams back pressure)
运行在 Netty, Undertow, and Servlet 3.1+ 容器
对比 Spring MVC
Spring MVC 构建于 Servlet API 之上,使用的是同步阻塞式 I/O 模型,什么是同步阻塞式 I/O 模型呢?就是说,每一个请求对应一个线程去处理。
Spring WebFlux 是一个异步非阻塞式 IO 模型,通过少量的容器线程就可以支撑大量的并发访问,所以 Spring WebFlux 可以有效提升系统的吞吐量和伸缩性,特别是在一些 IO 密集型应用中,Spring WebFlux 的优势明显。例如微服务网关 Spring Cloud Gateway 就使用了 WebFlux,这样可以有效提升网管对下游服务的吞吐量。
Spring WebFlux 与 Spring MVC 的关系如下图
可见,Spring WebFlux 并不是为了替代 Spring MVC 的,它与 Spring MVC 一起形成了两套 WEB 框架。两套框架有交集比如对 @Controller
注解的使用,以及均可使用 Tomcat、Jetty、Undertow 作为 Web 容器。
Spring MVC 还是 WebFlux?
针对这个问题,官方认为两者并不是二元对立的,他们可以并排使用,两者一起工作以扩大可用选项的范围。
我们来看看官方给的具体建议:
如果已经有了一个运行良好的 SpringMVC 应用程序,则无需更改。命令式编程是编写、理解和调试代码的最简单方法,我们可以选择最多的库,因为从历史上看,大多数都是阻塞的。
如果是个新应用且决定使用 非阻塞 Web 技术栈,那么 WebFlux 是个不错的选择。
对于使用 Java8 Lambda 或者 Kotlin 且 要求不那么复杂的小型应用程序或微服务来说,WebFlux 也是一个不错的选择
在微服务架构中,可以混合使用 SpringMVC 和 Spring WebFlux,两个都支持基于注解的编程模型
评估应用程序的一种简单方法是检查其依赖关系。如果要使用阻塞持久性 API(JPA、JDBC)或网络 API,那么 Spring MVC 至少是常见架构的最佳选择
如果有一个调用远程服务的 Spring MVC 应用程序,请尝试响应式WebClient
对于一个大型团队,向非阻塞、函数式和声明式编程转变的学习曲线是陡峭的。在没有全局开关的情况下,想启动 WebFlux,可以先使用 reactive WebClient
。此外,从小处着手并衡量收益。我们预计,对于广泛的应用,这种转变是不必要的。
这里最后一点的意思是要仔细通过技术原理(非阻塞 IO、并发性能、吞吐量..)来评估 WebFlux 究竟能为我们带来多少益处,同时评估为了获得这些好处所要付出的学习和改造成本,然后衡量收益,如果收益大值得一试,否则不建议动。
个人认为对于日常用 SpringMVC 开发的业务应用不用换 Spring-WebFlux,因为 SpringMVC 是同步阻塞式模型,对于应用的开发、调试、测试都比较友好,反过来这些点在非阻塞模型的 WebFlux 中就变成了缺点。
为什么要用 WebFlux
为什么要用 WebFlux ?或者换句话说 WebFlux 有什么优点?
首先是吞吐量
随着每个请求的被处理时间越长、并发请求的量级越大,WebFlux 相比 SpringMVC 的整体吞吐量高的越多,平均 的请求响应时间越短。如下图所示
吞吐量大了,意味着单位时间内可以处理的请求数变多了,比如原来 1w 个请求 10 秒处理完,现在 10w 个请求也是 10 秒处理完,就代表吞吐上去了。注意,是吞吐上去了,不代表单次请求快了,单次请求的速度和原来一样 。
传统阻塞 IO 模型 的不足包括
每个连接都需要独立线程处理,当并发数大时,创建线程数多,占用资源
采用阻塞 IO 模型,连接建立后,若当前线程没有数据可读,线程会阻塞在读操作上,造成资源浪费
针对传统阻塞 IO 模型的两个问题,可以采用如下的方案
基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理
基于 IO 复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。遍历到有新数据可以处理时,操作系统会通知程序,线程跳出阻塞状态,进行业务逻辑处理
Netty 所用的 Reactor 线程模型,就解决了阻塞 IO 的问题,具体来讲,它使用的是主从 Reactor 多线程模型
同时 Netty 自身也很好地利用了 IO 多路复用、epoll 优化、零拷贝等技术,极大程度上优化了 IO 的性能。我们知道 SpringWebFlux 底层也依赖了 Netty ,所以也获得了 Netty 带来的优势。这一点可以概括为应用的弹性或伸缩性 。根据实际请求量的大小进行资源的伸缩。
Mono Flux
前提:这里的例子使用的框架是 SpringBoot ,版本为 2.3.12.RELEASE
相应的 Spring 的大版本是 5,JDK 11
我们用两个最简单的例子,演示下用 Spring WebFlux 怎么写 Web 的 controller
当然首先要添加相关依赖
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-webflux</artifactId >
</dependency >
@RestController
@RequestMapping("/webflux")
public class HelloController {
@GetMapping("/hello")
public Mono<String> hello () {
return Mono.just("Hello Spring Webflux" );
> curl http://localhost:8080/webflux/hello
Hello Spring Webflux
我们再来一个返回对象列表的例子:
@GetMapping("/posts")
public Flux<Post> posts () {
WebClient webClient = WebClient.create();
Flux<Post> postFlux = webClient.get().uri("http://jsonplaceholder.typicode.com/posts" ).retrieve().bodyToFlux(Post.class);
return postFlux;
@NoArgsConstructor
@Data
public class Post {
private Integer userId;
private Integer id;
private String title;
private String body;
浏览器请求 http://localhost:8080/webflux/posts
,得到
解释一下这个例子,WebClient 是 Spring5 以后提供的,可以替代 RestTemplate,我们利用 WebClient 请求 jsonplaceholder 提供的 json 对象数组,将返回的结果映射成为 Post 对象,然后直接将 Post 对象列表返回给客户端。
有关 WebClient 的具体 API 这里先不做过多解释,我们看一下 Mono
和 Flux
这两个陌生的类。在 WebFlux 中 他们均能充当响应式编程中发布者的角色,不同的是:
Mono
:返回 0 或 1 个元素,即单个对象。
Flux
:返回 N 个元素,即 List 列表对象。
此外,在应用启动后,通过 IDEA 的控制台可以明显看到 Server 已经不是 Tomcat 了,而是 Netty
如果你没有看到 Netty 还是 Tomcat 的话,可能是你的pom.xml
中同时包含了以下两个依赖:
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-web</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-webflux</artifactId >
</dependency >
解决的方案是去掉 spring-boot-starter-web
依赖,这样 Server 就切换到了 Netty。
Stream
既然叫 Reactive Stream 我们就用下面一个例子 找一找流的感觉:
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> flux () {
Flux<String> flux = Flux.fromArray(new String []{"a" , "b" , "c" , "d" }).map(s -> {
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
return "<letter:" + s + ">" ;
return flux;
看下效果:
浏览器每隔一秒显示下一条数据,这正是流的效果,是不是有点儿像 WebSocket ?其实并不完全一样。奥妙在这里 text/event-stream
,这其实也是服务器向浏览器推送消息的一种方案。感兴趣的同学可以搜索一下 WebSocket 和 SSE 的区别。
Spring MVC 的前端控制器是 DispatcherServlet
, 而 WebFlux 是 DispatcherHandler
,它实现了 WebHandler
接口,主要看 handle 方法
有文章说,WebFlux 还不支持 MySQL
,可能是文章发布的较早,当时 R2DBC 没来得及支持那么多的数据库,只支持了 MongoDB 和 PostgreSQL 等几个。现在这个时间点 R2DBC 支持的数据库就比较多了,也包含了 MySQL(参数上文)
Spring Data R2DBC 可以与 Spring Data JPA 结合使用,其实 R2DBC 与原来的 JPA 使用方式差别不大,使用非常简单。
只是 Spring Data JPA 中方法返回的是真实的值,而 R2DBC 中,返回的是数据流Mono
,Flux
。
更多 R2DBC 的介绍,可以参考 Spring 的官方文档:docs.spring.io/spring-data…
有兴趣的同学可以用 Spring WebFlux + R2DBC+MySQL ,实现一下 CRUD 操作。就是一个从头到尾彻彻底底的响应式非阻塞应用。
WebClient
Spring 5 引入了新的 WebClientAPI,取代了现有的 RestTemplate 客户端。使用 WebClient 您可以使用功能流畅的 API 发出同步或异步 HTTP 请求,该 API 可以直接集成到您现有的 Spring 配置和 WebFlux 反应式框架中。
一个例子:
private static void testWebClient () {
WebClient webClient = WebClient.create();
monoTest(webClient, "http://jsonplaceholder.typicode.com/posts/1" );
* 从 API 获取单个帖子
private static void monoTest (WebClient webClient, String uri) {
Mono<Post> postMono = webClient.get().uri(uri).retrieve().bodyToMono(Post.class);
Post post1 = postMono.blockOptional().get();
log.info(post1.getTitle());
上面的是 Mono 的,再来一个 Flux 的(获得 post list 并将 id 求和):
* 获取 post 列表 ,使用 Flux 因为是多值 , 要是获取一个对象比如 `posts/1` 就可以用 Mono
* @param webClient
* @param uri
private static void fluxTest (WebClient webClient, String uri) {
Flux<Post> postFlux = webClient.get().uri(uri).retrieve().bodyToFlux(Post.class);
List<Post> posts = postFlux.collectList().block();
Integer idSum = posts.stream().mapToInt(post -> post.getId()).reduce(0 , (a, b) -> a + b);
log.info(idSum);
Reactive Programming
作为观察者模式(Observer) 的延伸,不同于传统的命令编程方式( Imperative programming)同步拉取数据的方式,如迭代器模式(Iterator) 。而是采用数据发布者同步或异步地推送到数据流(Data Streams)的方案。
当该数据流(Data Steams)订阅者监听到传播变化时,立即作出响应动作。在实现层面上,Reactive Programming 可结合函数式编程简化面向对象语言语法的臃肿性,屏蔽并发实现的复杂细节,提供数据流的有序操作,从而达到提升代码的可读性,以及减少 Bugs 出现的目的。同时,Reactive Programming
结合背压(Backpressure)的技术解决发布端生成数据的速率高于订阅端消费的问题。
如果说 Spring Cloud 是从【宏观系统层面的开发】角度在实践健壮的高可用系统+系统运维,K8S 在【DEV OPS】层面实践更好的系统运维,Service Mesh 在【基础设施层(infra)】实践健壮的高可用系统+系统运维,那么 WebFlux(包括整个 Reactive Stack 体系的其他成员)就是从【微观项目层面的开发】角度在实践健壮的高可用系统+系统运维。或多或少,它们都从各个维度在朝着“更少的人治”角度去努力。
GitHub 地址
代码示例我放到了 github 上 :github.com/xiaobox/Spr…
juejin.cn/post/684490…
www.jianshu.com/p/15d0a2bed…
www.jdon.com/56547
segmentfault.com/a/119000001…
www.reactivemanifesto.org/
www.yisu.com/zixun/96685…
mp.weixin.qq.com/s/BfgQ760h_…
docs.spring.io/spring-data…
tech.io/playgrounds…
4415
_PhoenixWong_
Spring
1543
amandakelake
React.js
JavaScript
React Native