相关文章推荐
俊秀的冲锋衣  ·  Spring ...·  1 年前    · 
探究WebFlux之Reactor

探究WebFlux之Reactor

上篇文章主要介绍了响应式流,也提到了非常著名的Reactor,因为webflux响应式的实现底层引用的核心库就是reactor,所以我们 有必要去了解和使用reactor,这样才能在响应式编程里面如鱼得水。本篇文章 需要了解 java 8 lambada和响应式编程的一些概念,具体可以看前两篇文章

这篇文章会带主要讲的是reactor的基本操作,因为时间和字数限制,一篇文章把所有的东西讲出来也不太可能,所以只会讲一些便于初次接触响应式编程的新手快速入门,想要更加全面了解reactor的东西可以去reactor的文档。

1、引入依赖

1.1 maven

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
          <version>3.3.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>   
        <version>3.3.5.RELEASE</version>
    </dependency>
</dependencies>

1.2 gradle

implementation 'io.projectreactor:reactor-core:3.3.5.RELEASE'
implementation 'io.projectreactor:reactor-test:3.3.5.RELEASE'
2、基本概念

2、简介

在reactor中有两个最基本的概念,发布者和订阅者,可以理解为生产者和消费者的概念。在Reactor中发布者有两个,一个是 Flux ,一个是 Mono。 Flux代表的是0-N个元素的响应式序列,而Mono代表的是0-1个的元素的结果。

可能你会觉的很奇怪,既然Flux可以发布0-N个元素 也包括了1个元素,那为什么还要Mono这个东西,举个例子 一个接口 有且只会返回一个元素,你还会去用List包装吗,或者是一次http请求 只会返回一个响应,既然已经确定了他只会返回一个元素,那么为啥还有用多个取表示返回呢,这是语义上规范差别。其实这种规范是为了避免一些只对Flux多个元素的操作,无意义的在只有一个元素Flux上操作,用Mono这种语义上的规范去杜绝这种问题。

来看看Flux和Mono怎么创建自己的序列

        //声明的参数就是数据流的元素
        Flux.just(1,2,3);
        Mono.just(1);

当然也有其他的生成方式

        Flux.fromArray(new String[]{"a","b","c"});
        Flux.fromStream(new ArrayList<>().stream());

对于 Flux和Mono来说,just 是数据完成的信号,那如果不是通过just声明的数据流,没有这种数据准备完成的信号,那么这个流就是一个无限流。除了我们手动声明数据准备的完成,错误信号也标志这整个流的完成。

        Flux.error(new RuntimeException());

还有一种情况就是当Flux和Mono没有发出任何一个元素,而是直接发出了完成信号,那么这个流就是一个空的流,像这样。

        Flux.error(new RuntimeException());
        Flux.just();
        Flux.empty();

那么既然有空的流,那一定有应用的场景,如果我们db采用的是响应式驱动的库,在查询数据的时候会有查询为空的情况,这个时候返回的就是空的流。

还有很重要的一点就是 Flux.just(1,2,4) 只是定义了一个数据流而已,在 subscribe() 之前的操作什么也不会发生,这一点让我想起了java 8 stream的惰性求值,在中止操作之前的操作都不会触发(stream 惰性求值的原理我已经在前面的文章分析过了,文章链接就在顶部)。

例如我们要打印刚刚声明的数据流,需要这样做

        Flux.just(1, 2, 3).subscribe(System.out::println);

另外 subscribe 时,还可以指定错误的回调处理,以及数据处理完的完成回调

图片来自于https://projectreactor.io/docs/core/release/reference/index.html

所以Subscribe 可以这样写 更加完善

Flux.error(new Exception("error")).subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("Completed!")
);


3、如何调试reactor

在我们传统阻塞代码里面,调试(Debug)的时候是一件非常简单的事情,通过打断点,得到相关的stack 的信息,就可以很清楚的知道错误信息(不过在多线程的环境下去打断点,需要切换线程环境,也有点麻烦)。

但是在reactor 环境下去调试代码并不是一件简单的事情,最常见的就是 一个 Flux流,怎么去得到每个元素信息,怎么去知道在管道里面下一个元素是什么,每个元素是否像期望的那样做了操作。所以这也是从传统编程切换到响应式编程的难点,开发人员需要花时间去学习这个操作,但是感觉难受总是好的,因为做什么都太容易的话,自己会长期止步于此,像早期的EJB到j2ee,ssh -> ssm -> spring boot -> spring cloud ,从微服务->service mesh -> serve less ,到现在一些一线大厂盛行的中台。也许这一次就是改变自己的时候。

言归正传,关于比较复杂的调试后期再说,我们先从最基本的单元测试开始。官方推荐的工具是 StepVerifier, 就在上面引入依赖的test包里面,来简单看一个示例。

    @Test
    public void reactorTest(){
        StepVerifier.create(Flux.just(1,2)) //1
              .expectNext(1,2) //2 
              .expectComplete() //3 
              .verify(); //4 
    }
  1. 创建测试的异步流
  2. 测试期望的值
  3. 测试是否完成
  4. 验证

这里先看一个简单的单元测试,后续会详细讲解reactor环境下的测试方法,我们首先要熟悉的是reactor的一些常用的api(这里需要读者有lambada 的基础)

3.1 map

这里的map和java 8 stream的map是同一个意思,用于元素的转换,像这样

    @Test
    public void reactorMapTest(){
        StepVerifier.create(Flux.just(1,2)
                .map(v->v+1))
                .expectNext(2,3)
                .expectComplete()
                .verify();
    }

还是之前的代码,只是对每一个元素都自增加一,这里就不多说了,对lambada熟悉的同学都了解。

3.2 flatmap

flatmap也是对元素的转换,但是不同的是flatmap是将元素转换为流,再将流合并为一个大的流。

    @Test
    public void reactorFlatMapTest(){
        StepVerifier.create(Flux.just("crabman","is","hero")
                .flatMap(v->Flux.fromArray(v.split("")))
                .doOnNext(System.out::println))
                .expectNextCount(13)
                .verifyComplete();
    }
tips :flatmap 和 map的区别
从源码上来看 map就是一个function函数,输入一个输出一个,对于flatmap来讲它接受的是输出为Publisher的function,也就是说对于flatmap来讲 输入一个值,输出的是一个Publisher,所以map是一对一的关系,而flatmap 是一对多或者多对多的关系,并且两者输出也不一样。那flatmap 的应用场景在哪里,例如一个接口,入参是List<id>,用户id 的集合,需求是返回每个id 对应的具体信息,所以代码看起来就是这样 xx.flatmap(id->getUserInfoById(id))

3.3 filter

reactor 的filter和java 8 stream 的filter是一样的,就不多说了,这里过滤掉值为2 的

    @Test
    public void reactorFilterTest(){
        StepVerifier.create(Flux.just(1,2)
                .map(v->v+1)
                .filter(s->s!=2)
                .doOnNext(System.out::println))
                .expectNext(3)
                .expectComplete()
                .verify();
    }

3.4 zip

这个是操作可能看起来比较陌生,顾名思义,“压缩”就是将多个流一对一的合并起来,还有一个原因,因为在每个flux流或者mono流里面,各个流的速度是不一样,zip还有个作用就是将两个流进行同步对齐。例如我们这里在加入另一个流,这个流会不停的发出元素,为了让大家可以感受到同,这里限制另一个流的速度为没1秒发出一个元素,这样合并的流也会向另一个流对齐。

    @Test
    public void reactorZipTest(){
        //定义一个Flux流
        Flux<String> stringFlux = Flux.just("a", "b", "c", "d");
        //这里使用计时器,因为在单元测试里面,可能元素没执行完,他就会直接返回
        CountDownLatch countDownLatch = new CountDownLatch(1);  // 2
        Flux.zip(stringFlux,Flux.interval(Duration.ofSeconds(1)))
                .subscribe(t->System.out.println(t.getT1())
                        ,System.err::println
                        ,countDownLatch::countDown);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
    }

上面讲的这四个是比较常用的,还有很多,由于篇幅限制所以没有办法全部罗列出来,感兴趣的可以去官网文档看更多的api

4、Reactor中的多线程

在我们java的传统的编程中,对于线程之间的调度有封装好的线程池工具类供我们使用,或者我们可以通过线程池的构造函数定义自己的线程池,这一切都让多线程的调度都变得很容易,那么在reactor中怎么处理线程的调度

4.1 Schedulers

在reactor中处理线程调度的不叫thread pool,而是Schedulers(调度器),通过调度器就可以创建出供我们直接使用的多线程环境。

4.1.1 Schedulers.immediate()

在当前线程中使用

4.1.2 Schedulers.single()

创建一个可重用的单线程环境,该方法的所有调用者都会重复使用同一个线程。

4.1.3 Schedulers.elastic()

创建一个弹性线程池,会重用空闲线程,当线程池空闲时间过长就会自动废弃掉。通常使用的场景是给一个阻塞的任务分配自己的线程,从而不会影响到其他任务的执行。

4.1.4 Schedulers.parallel()

创建一个固定大小的线程池,线程池大小和cpu个数相等。

来看一个具体使用的实例,通过 Schedulers.elastic() 将一个同步阻塞的方法改写成异步的。

 private Integer syncMethod(){
         try {
             TimeUnit.SECONDS.sleep(2);
         } catch (InterruptedException e) {
             e.printStackTrace();
        return 123456;
     @Test
     public void switchSyncToAsyncTest(){
         CountDownLatch countDownLatch = new CountDownLatch(1);
         Mono.fromCallable(()->syncMethod())
                 .subscribeOn(Schedulers.elastic())
                 .subscribe(System.out::println,null,countDownLatch::countDown);
         try {
             countDownLatch.await();
         } catch (InterruptedException e) {
             e.printStackTrace();
     }

简单分析上述代码,通过fromCallable声明 一个callable 的mono,然后通过subscribeOn 切换环境,调度任务到单独的弹性线程池工作。

5、错误处理

在传统的编程中,我们处理单个接口错误的方式,可能是 try-catch-finally的方式,也可能是try-winth-resource的语法糖,这些在reactor中变得不太一样。下面来说一说reactor中的几种错误处理方式。

5.1 onErrorReturn

onErrorReturn在发生错误的时候,会提供一个缺省值,类似于安全取值的问题,但是这个在响应式流里面会更加实用。

 Flux.just(1,2,0)
              .map(v->2/v)
              .onErrorReturn(0)
              .map(v->v*2)
              .subscribe(System.out::println,System.err::println);

这样就可以在处理一些未知元素的时候,又不想让未知因素中止程序的继续运行,就可以采取这种方式。

5.2 onErrorResume

在发生错误的时候,提供一个新的流或者值返回,这样说可能不太清楚,看代码。

        Flux.just(1,2,0)
                 //调用redis服务获取数据
                .flatMap(id->redisService.getUserByid(id))
                //当发生异常的时候,从DB用户服务获取数据
                .onErrorResume(v->userService.getUserByCache(id));

类似于错误的一个callback;

5.3 onErrorMap

上面的都是我们去提供缺省的方法或值处理错误,但是有的时候,我们需要抛出错误,但是需要将错误包装一下,可读性好一点,也就是抛出自定义异常。

                Flux.just(1,2,0)
                .flatMap(id->getUserByid(id))
                .onErrorMap(v->new CustomizeExcetion("服务器开小差了",v));


5.4 doOnError 记录错误日志

在发生错误的时候我们需要记录日志,在reactor里面专门独立出api记录错误日志

        Flux.just(1,2,0)
                .flatMap(id->getUserByid(id))
                .doOnError(e-> Log.error("this occur something error"))
                .onErrorMap(v->new CustomizeExcetion("服务器开小差了",v));

doOnError 对于流里面的元素只读,也就是他不会对流里面的任务元素操作,记录日志后,会讲错误信号继续抛到后面,让后面去处理。

5.5 finally 确保做一些事情

有的时候我们想要像传统的同步代码那样使用finally去做一些事情,比如关闭http连接,清理资源,那么在reactor中怎么去做finally

        Flux.just(1,2,0)
                .flatMap(id->getUserByid(id))
                .doOnError(e-> Log.error("this occur something error"))
                .onErrorMap(v->new CustomizeExcetion("服务器开小差了",v))
                .doFinally(System.out.println("我会确保做一些事情"))
        ;

或者当我们打开一个连接需要关闭资源的时候,还可以这样写

        Flux.using(
                () -> createHttpClient(),
                client -> Flux.just(client.sendRequest()),
                createHttpClient::close
        );

使用using函数的三个参数,获取client,发送请求,关闭资源。

5.6 retry 重试机制

当遇到一些不可控因素导致的程序失败,但是代码逻辑确实是正确的,这个时候需要重试机制。

      Flux.just(1,2,0)
              .map(v->2/v)
              .retry(1)
              .subscribe(System.out::println,System.err::println);

但是需要注意的是重试不是从错误的地方开始重试,相当于对publisher 的重订阅,也就是从零开始重新执行一遍,所以无法达到类似于断点续传的功能,所以使用场景还是有限制。

6、背压(流量控制)

那么在前几篇文章不断提到的背压,可不可以自定义,答案是可以的,通过 Reactor提供的BaseSubscriber来进行自定义我们自己流量控制的subscriber

        Flux.just(1,2)
                .doOnRequest(s->System.out.println("no. "+s))
                .subscribe(new BaseSubscriber<Integer>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        System.out.println("订阅开始了,我要请求几个元素");
                        request(1);