探究WebFlux之Reactor
上篇文章主要介绍了响应式流,也提到了非常著名的Reactor,因为webflux响应式的实现底层引用的核心库就是reactor,所以我们 有必要去了解和使用reactor,这样才能在响应式编程里面如鱼得水。本篇文章 需要了解 java 8 lambada和响应式编程的一些概念,具体可以看前两篇文章
这篇文章会带主要讲的是reactor的基本操作,因为时间和字数限制,一篇文章把所有的东西讲出来也不太可能,所以只会讲一些便于初次接触响应式编程的新手快速入门,想要更加全面了解reactor的东西可以去reactor的文档。
1、引入依赖
1.1 maven
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
<version>3.3.5.RELEASE</version>
</dependency>
</dependencies>
1.2 gradle
implementation 'io.projectreactor:reactor-core:3.3.5.RELEASE'
implementation 'io.projectreactor:reactor-test:3.3.5.RELEASE'
2、基本概念
2、简介
在reactor中有两个最基本的概念,发布者和订阅者,可以理解为生产者和消费者的概念。在Reactor中发布者有两个,一个是 Flux ,一个是 Mono。 Flux代表的是0-N个元素的响应式序列,而Mono代表的是0-1个的元素的结果。
可能你会觉的很奇怪,既然Flux可以发布0-N个元素 也包括了1个元素,那为什么还要Mono这个东西,举个例子 一个接口 有且只会返回一个元素,你还会去用List包装吗,或者是一次http请求 只会返回一个响应,既然已经确定了他只会返回一个元素,那么为啥还有用多个取表示返回呢,这是语义上规范差别。其实这种规范是为了避免一些只对Flux多个元素的操作,无意义的在只有一个元素Flux上操作,用Mono这种语义上的规范去杜绝这种问题。
来看看Flux和Mono怎么创建自己的序列
//声明的参数就是数据流的元素
Flux.just(1,2,3);
Mono.just(1);
当然也有其他的生成方式
Flux.fromArray(new String[]{"a","b","c"});
Flux.fromStream(new ArrayList<>().stream());
对于 Flux和Mono来说,just 是数据完成的信号,那如果不是通过just声明的数据流,没有这种数据准备完成的信号,那么这个流就是一个无限流。除了我们手动声明数据准备的完成,错误信号也标志这整个流的完成。
Flux.error(new RuntimeException());
还有一种情况就是当Flux和Mono没有发出任何一个元素,而是直接发出了完成信号,那么这个流就是一个空的流,像这样。
Flux.error(new RuntimeException());
Flux.just();
Flux.empty();
那么既然有空的流,那一定有应用的场景,如果我们db采用的是响应式驱动的库,在查询数据的时候会有查询为空的情况,这个时候返回的就是空的流。
还有很重要的一点就是 Flux.just(1,2,4) 只是定义了一个数据流而已,在 subscribe() 之前的操作什么也不会发生,这一点让我想起了java 8 stream的惰性求值,在中止操作之前的操作都不会触发(stream 惰性求值的原理我已经在前面的文章分析过了,文章链接就在顶部)。
例如我们要打印刚刚声明的数据流,需要这样做
Flux.just(1, 2, 3).subscribe(System.out::println);
另外 subscribe 时,还可以指定错误的回调处理,以及数据处理完的完成回调
所以Subscribe 可以这样写 更加完善
Flux.error(new Exception("error")).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);
3、如何调试reactor
在我们传统阻塞代码里面,调试(Debug)的时候是一件非常简单的事情,通过打断点,得到相关的stack 的信息,就可以很清楚的知道错误信息(不过在多线程的环境下去打断点,需要切换线程环境,也有点麻烦)。
但是在reactor 环境下去调试代码并不是一件简单的事情,最常见的就是 一个 Flux流,怎么去得到每个元素信息,怎么去知道在管道里面下一个元素是什么,每个元素是否像期望的那样做了操作。所以这也是从传统编程切换到响应式编程的难点,开发人员需要花时间去学习这个操作,但是感觉难受总是好的,因为做什么都太容易的话,自己会长期止步于此,像早期的EJB到j2ee,ssh -> ssm -> spring boot -> spring cloud ,从微服务->service mesh -> serve less ,到现在一些一线大厂盛行的中台。也许这一次就是改变自己的时候。
言归正传,关于比较复杂的调试后期再说,我们先从最基本的单元测试开始。官方推荐的工具是 StepVerifier, 就在上面引入依赖的test包里面,来简单看一个示例。
@Test
public void reactorTest(){
StepVerifier.create(Flux.just(1,2)) //1
.expectNext(1,2) //2
.expectComplete() //3
.verify(); //4
}
- 创建测试的异步流
- 测试期望的值
- 测试是否完成
- 验证
这里先看一个简单的单元测试,后续会详细讲解reactor环境下的测试方法,我们首先要熟悉的是reactor的一些常用的api(这里需要读者有lambada 的基础)
3.1 map
这里的map和java 8 stream的map是同一个意思,用于元素的转换,像这样
@Test
public void reactorMapTest(){
StepVerifier.create(Flux.just(1,2)
.map(v->v+1))
.expectNext(2,3)
.expectComplete()
.verify();
}
还是之前的代码,只是对每一个元素都自增加一,这里就不多说了,对lambada熟悉的同学都了解。
3.2 flatmap
flatmap也是对元素的转换,但是不同的是flatmap是将元素转换为流,再将流合并为一个大的流。
@Test
public void reactorFlatMapTest(){
StepVerifier.create(Flux.just("crabman","is","hero")
.flatMap(v->Flux.fromArray(v.split("")))
.doOnNext(System.out::println))
.expectNextCount(13)
.verifyComplete();
}
tips :flatmap 和 map的区别
从源码上来看 map就是一个function函数,输入一个输出一个,对于flatmap来讲它接受的是输出为Publisher的function,也就是说对于flatmap来讲 输入一个值,输出的是一个Publisher,所以map是一对一的关系,而flatmap 是一对多或者多对多的关系,并且两者输出也不一样。那flatmap 的应用场景在哪里,例如一个接口,入参是List<id>,用户id 的集合,需求是返回每个id 对应的具体信息,所以代码看起来就是这样 xx.flatmap(id->getUserInfoById(id))
3.3 filter
reactor 的filter和java 8 stream 的filter是一样的,就不多说了,这里过滤掉值为2 的
@Test
public void reactorFilterTest(){
StepVerifier.create(Flux.just(1,2)
.map(v->v+1)
.filter(s->s!=2)
.doOnNext(System.out::println))
.expectNext(3)
.expectComplete()
.verify();
}
3.4 zip
这个是操作可能看起来比较陌生,顾名思义,“压缩”就是将多个流一对一的合并起来,还有一个原因,因为在每个flux流或者mono流里面,各个流的速度是不一样,zip还有个作用就是将两个流进行同步对齐。例如我们这里在加入另一个流,这个流会不停的发出元素,为了让大家可以感受到同,这里限制另一个流的速度为没1秒发出一个元素,这样合并的流也会向另一个流对齐。
@Test
public void reactorZipTest(){
//定义一个Flux流
Flux<String> stringFlux = Flux.just("a", "b", "c", "d");
//这里使用计时器,因为在单元测试里面,可能元素没执行完,他就会直接返回
CountDownLatch countDownLatch = new CountDownLatch(1); // 2
Flux.zip(stringFlux,Flux.interval(Duration.ofSeconds(1)))
.subscribe(t->System.out.println(t.getT1())
,System.err::println
,countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
上面讲的这四个是比较常用的,还有很多,由于篇幅限制所以没有办法全部罗列出来,感兴趣的可以去官网文档看更多的api
4、Reactor中的多线程
在我们java的传统的编程中,对于线程之间的调度有封装好的线程池工具类供我们使用,或者我们可以通过线程池的构造函数定义自己的线程池,这一切都让多线程的调度都变得很容易,那么在reactor中怎么处理线程的调度
4.1 Schedulers
在reactor中处理线程调度的不叫thread pool,而是Schedulers(调度器),通过调度器就可以创建出供我们直接使用的多线程环境。
4.1.1 Schedulers.immediate()
在当前线程中使用
4.1.2 Schedulers.single()
创建一个可重用的单线程环境,该方法的所有调用者都会重复使用同一个线程。
4.1.3 Schedulers.elastic()
创建一个弹性线程池,会重用空闲线程,当线程池空闲时间过长就会自动废弃掉。通常使用的场景是给一个阻塞的任务分配自己的线程,从而不会影响到其他任务的执行。
4.1.4 Schedulers.parallel()
创建一个固定大小的线程池,线程池大小和cpu个数相等。
来看一个具体使用的实例,通过 Schedulers.elastic() 将一个同步阻塞的方法改写成异步的。
private Integer syncMethod(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
return 123456;
@Test
public void switchSyncToAsyncTest(){
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(()->syncMethod())
.subscribeOn(Schedulers.elastic())
.subscribe(System.out::println,null,countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
简单分析上述代码,通过fromCallable声明 一个callable 的mono,然后通过subscribeOn 切换环境,调度任务到单独的弹性线程池工作。
5、错误处理
在传统的编程中,我们处理单个接口错误的方式,可能是 try-catch-finally的方式,也可能是try-winth-resource的语法糖,这些在reactor中变得不太一样。下面来说一说reactor中的几种错误处理方式。
5.1 onErrorReturn
onErrorReturn在发生错误的时候,会提供一个缺省值,类似于安全取值的问题,但是这个在响应式流里面会更加实用。
Flux.just(1,2,0)
.map(v->2/v)
.onErrorReturn(0)
.map(v->v*2)
.subscribe(System.out::println,System.err::println);
这样就可以在处理一些未知元素的时候,又不想让未知因素中止程序的继续运行,就可以采取这种方式。
5.2 onErrorResume
在发生错误的时候,提供一个新的流或者值返回,这样说可能不太清楚,看代码。
Flux.just(1,2,0)
//调用redis服务获取数据
.flatMap(id->redisService.getUserByid(id))
//当发生异常的时候,从DB用户服务获取数据
.onErrorResume(v->userService.getUserByCache(id));
类似于错误的一个callback;
5.3 onErrorMap
上面的都是我们去提供缺省的方法或值处理错误,但是有的时候,我们需要抛出错误,但是需要将错误包装一下,可读性好一点,也就是抛出自定义异常。
Flux.just(1,2,0)
.flatMap(id->getUserByid(id))
.onErrorMap(v->new CustomizeExcetion("服务器开小差了",v));
5.4 doOnError 记录错误日志
在发生错误的时候我们需要记录日志,在reactor里面专门独立出api记录错误日志
Flux.just(1,2,0)
.flatMap(id->getUserByid(id))
.doOnError(e-> Log.error("this occur something error"))
.onErrorMap(v->new CustomizeExcetion("服务器开小差了",v));
doOnError 对于流里面的元素只读,也就是他不会对流里面的任务元素操作,记录日志后,会讲错误信号继续抛到后面,让后面去处理。
5.5 finally 确保做一些事情
有的时候我们想要像传统的同步代码那样使用finally去做一些事情,比如关闭http连接,清理资源,那么在reactor中怎么去做finally
Flux.just(1,2,0)
.flatMap(id->getUserByid(id))
.doOnError(e-> Log.error("this occur something error"))
.onErrorMap(v->new CustomizeExcetion("服务器开小差了",v))
.doFinally(System.out.println("我会确保做一些事情"))
;
或者当我们打开一个连接需要关闭资源的时候,还可以这样写
Flux.using(
() -> createHttpClient(),
client -> Flux.just(client.sendRequest()),
createHttpClient::close
);
使用using函数的三个参数,获取client,发送请求,关闭资源。
5.6 retry 重试机制
当遇到一些不可控因素导致的程序失败,但是代码逻辑确实是正确的,这个时候需要重试机制。
Flux.just(1,2,0)
.map(v->2/v)
.retry(1)
.subscribe(System.out::println,System.err::println);
但是需要注意的是重试不是从错误的地方开始重试,相当于对publisher 的重订阅,也就是从零开始重新执行一遍,所以无法达到类似于断点续传的功能,所以使用场景还是有限制。
6、背压(流量控制)
那么在前几篇文章不断提到的背压,可不可以自定义,答案是可以的,通过 Reactor提供的BaseSubscriber来进行自定义我们自己流量控制的subscriber
Flux.just(1,2)
.doOnRequest(s->System.out.println("no. "+s))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("订阅开始了,我要请求几个元素");
request(1);