相关文章推荐
要出家的煎饼果子  ·  CREATE INDEX ...·  2 周前    · 
阳刚的小狗  ·  Java8 Stream ...·  2 周前    · 
读研的人字拖  ·  Java 读取 .properties ...·  1 周前    · 
眼睛小的木耳  ·  spring boot test ...·  1 周前    · 
冷冷的松鼠  ·  vue中 ...·  2 月前    · 

1 blockingFirst

T blockingFirst()

返回此Flowable发出的第一个项,如果它没有发出任何项,则抛出NoSuchElementException。

T blockingFirst(T defaultItem)

返回此Flowable发出的第一个项,如果它不发出任何项,则返回默认值。

1.1 blockingFirst图解

1.2 blockingFirst测试用例

@Test public void doBlockingFirst() { System.out.println("######doAny#####"); Integer result = Flowable.just(10, 2, 3, 4, 5).blockingFirst(); System.out.println(result); ######doAny#####

1.3 blockingFirst分析说明

blockingFirst返回发射的第一个item

1.4 实用场景

2 blockingForEach

void blockingForEach(Consumer<? super T > onNext)

以阻塞方式消耗上游Flowable并使用当前线程上的每个上游项调用给定的Consumer,直到上游终止。

2.1 blockingForEach测试用例

    @Test
    public void doBlockingForEach() {
        System.out.println("######doBlockingForEach#####");
        Flowable.just(10, 2, 3, 4, 5).blockingForEach(new Consumer<Integer>() {
            @Override
            public void accept(Integer result) throws Exception {
                System.out.println("blocking result = " + result);
######doBlockingForEach#####
blocking result = 10
blocking result = 2
blocking result = 3
blocking result = 4
blocking result = 5

2.2 blockingForEach说明

blockingForEach会使用指定的Consumer消费Flowable发出的每一个项目

2.4 实用场景

3 blockingIterable

Iterable<T> blockingIterable()

将此Flowable转换为Iterable。

Iterable<T> blockingIterable(int bufferSize)

将此Flowable转换为Iterable,限制 bufferSize

3.1 blockingIterable图解

3.2 blockingIterable测试用例

    @Test
    public void blockingIterable() {
        System.out.println("######blocksingIterable#####");
        Iterable<Integer> iterable = Flowable.just(10, 9, 8, 7).blockingIterable();
        for(Integer aIterable :iterable) {
            System.out.println("value = " + aIterable);
######blockingIterable#####
value = 10
value = 9
value = 8
value = 7

3.3 实用场景

4 blockingLast

T blockingLast ()

返回此Flowable发出的最后一项,如果此Flowable没有发出任何项,则抛出NoSuchElementException。

T blockingLast ( T defaultItem)

返回此Flowable发出的最后一项,如果它没有发出任何项,则返回默认值。

4.1 blockingLast图解

4.2 blockingLast测试用例

    @Test
    public void blockingLast() {
        System.out.println("######blockingLast#####");
        Integer last = Flowable.just(10, 9, 8, 7).blockingLast();
        System.out.println("Last value = "+ last);
######blockingLast#####
Last value = 7

4.3 实用场景

5 blockingLatest

Iterable<T>blockingLatest()

返回一个Iterable,它将会阻塞,直到此Flowable发出的新项目,并将其返回。

 

5.1 blockingLatest图解

5.2 blockingLatest测试用例

   @Test
    public void blockingLatest() {
        System.out.println("######blockingLatest#####");
        Iterable<Integer> latest = Flowable.just(10, 9, 8, 7).delay(1, TimeUnit.SECONDS).blockingLatest();
        if(latest.iterator().hasNext()) {
            System.out.println("Latest value = "+ latest.iterator().next());
        } else {
            System.out.println("Latest value is null " );
######blockingLatest#####
Latest value = 10

5.3 实用场景

6 blockingMostRecent

Iterable<T>blockingMostRecent(T initialItem) 返回一个Iterable,会阻塞等待此Flowable发出最近的项,并将这个项返回。

6.1 blockingMostRecent图解

6.2 blockingMostRecent测试用例

    @Test
    public void blockingMostRecent() {
        System.out.println("######blockingMostRecent#####");
        Flowable publisher = Flowable.just("我说","123","木头人").delay(100,TimeUnit.MILLISECONDS);
        Iterable<String> mostRecent =  publisher.blockingMostRecent("null");
        mostRecent.forEach(new java.util.function.Consumer<String>() {
            @Override
            public void accept(String mostRecent) {
                System.out.println("mostRecent = " + mostRecent);
######blockingMostRecent#####
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = 我说
mostRecent = 我说
mostRecent = 我说
mostRecent = 123
mostRecent = 123
mostRecent = 123
mostRecent = 木头人
mostRecent = 木头人
mostRecent = 木头人

6.3 blockingMostRecent测试用例分析

blockingMostRecent这个操作符会阻塞等待Flowable发射项目,测试用例中反复打印null就是阻塞等待的证据,直到有新的值发射

6.4 实用场景

7 blockingNext

Iterable<T>

blockingNext() 返回一个Iterable,在每次迭代时阻塞,直到此Flowable发出一个新项,然后Iterable返回该项

7.1 blockingNext图解

源码中注释里面指向说明文档图,与takeLast一样

7.2 blockingNext测试用例

@Test public void blockingNext() { System.out.println("######blockingNext#####"); Flowable publisher = Flowable.just("我说","123","木头人").delay(1,TimeUnit.SECONDS); publisher.subscribe(new Consumer<String>() { @Override public void accept(String value) throws Exception { System.out.println("value = " + value); Iterable<String> next = publisher.blockingNext(); next.forEach(new java.util.function.Consumer<String>() { @Override public void accept(String next) { System.out.println("next = " + next); 测试1结果 ######blockingNext##### value = 我说 value = 123 value = 木头人 next = 我说 @Test public void blockingNext1() { System.out.println("######blockingNext#####"); Flowable publisher = Flowable.interval(0, 1, TimeUnit.SECONDS); Iterable<Long> next = publisher.blockingNext(); publisher.subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("value = " + value); next.forEach(new java.util.function.Consumer<Long>() { @Override public void accept(Long next) { System.out.println("next = " + next); 测试二结果 ######blockingNext##### value = 0 next = 0 value = 1 next = 1 value = 2 next = 2 ...继续

7.3 blockingNext测试用例分析

Flowable实例在使用blockingNext操作符时会返回一个Iterable,该Iterable在每次迭代时阻塞,直到Flowable发出一个新项,然后Iterable返回该项,上面的测试1只是发射了三个值后就结束了,所以只打印出第一个值便结束了,剩下的还来不及返回便结束了;测试2,每隔1s发射一个长整型的值,从0开始,每发射一次+1,这时候next也会跟着一直打印。其实这说明了一个问题,也就是说 blockingNext操作符是对 Flowable实例的发射做了阻塞式的监听,在Flowable没有结束前就会一直阻塞等待Flowable的发射消息,发射一次返回一次。

7.4 实用场景

8 blockingSingle

T blockingSingle ()

如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目,如果它发出多个项目,则抛出IllegalArgumentException。

源码的注释是:If this {@code Flowable} completes after emitting a single item, return that item, otherwise throw a {@code NoSuchElementException}.

但是NoSuchElementException应该写错了错误的,在做测试时候抛出的异常是IllegalArgumentException,下面重载接口是正确的

T blockingSingle ( T defaultItem)

如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目; 如果它发出多个项目,则抛出IllegalArgumentException; 如果它没有发出任何项目,则返回默认值。

8.1 blockingSingle图解

图解地址: http://reactivex.io/documentation/operators/first.html

源码中指向的图解就是这个,看起来似乎和某些操作符使用了相同的,这说明这些操作符表达的是一个类似操作

8.2 blockingSingle测试用例

@Test public void blockingSingle() { System.out.println("######blockingSingle#####"); Flowable<Long> publisher = Flowable.just(1L, 2L); Long singleItem = publisher.blockingSingle(); System.out.println("singleItem = " + singleItem); 测试1结果: ######blockingNext##### java.lang.IllegalArgumentException: Sequence contains more than one element! at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onNext(FlowableSingleSingle.java:82) at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:84) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Process finished with exit code 255 @Test public void blockingSingle2() { System.out.println("######blockingSingle2#####"); Flowable<Long> publisher = Flowable.just(1L); Long singleItem = publisher.blockingSingle(); System.out.println("singleItem = " + singleItem); 测试2结果 ######blockingSingle2##### singleItem = 1 Process finished with exit code 0 @Test public void blockingSingle3() { System.out.println("######blockingSingle2#####"); Flowable<Long> publisher = Flowable.empty(); Long singleItem = publisher.blockingSingle(-1L); System.out.println("singleItem = " + singleItem); 测试3结果 ######blockingSingle2##### singleItem = -1 Process finished with exit code 0

8.3 blockingSingle测试用例分析

如果Flowable在发射完单个项目便结束了,那么blockingSingle操作符会返回这个发射的结果,如果Flowable发射项目在2个一个上会报错
(java.lang.IllegalArgumentException: Sequence contains more than one element!)

测试3是对其重载方法做测试,通过 Flowable.empty()不会发射项目,blockingSingle操作符会返回默认的值-1

8.4 实用场景

9 blockingSubscribe

void blockingSubscribe()

Runs the source Flowable to a terminal event, ignoring any values and rethrowing any exception.

将观察源运行到结束为止,忽略任何值并重新抛出任何异常。

void blockingSubscribe(Consumer<? super T> onNext)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, int bufferSize)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, int bufferSize)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Consumer<? super T> onNext, int bufferSize)

订阅(观察)源并调用当前线程上的给定回调。

void blockingSubscribe(Subscriber<? super T> subscriber)

订阅源并在当前线程上调用Subscriber方法。


9.1 blockingSubscribe测试用例

@Test public void blockingSubscribe() { System.out.println("######blockingSubscribe#####"); Flowable<Long> source = Flowable.just(1L, 2L, 3L); source.subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("aLong = " + aLong); source.blockingSubscribe(); 测试1结果 ######blockingSubscribe##### aLong = 1 aLong = 2 aLong = 3 Process finished with exit code 0 @Test public void blockingSubscribe2() { System.out.println("######blockingSubscribe#####"); Flowable<Long> publisher = Flowable.just(4L, 5L, 3L); publisher.blockingSubscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("aLong = " + aLong); 测试2结果 ######blockingSubscribe##### aLong = 4 aLong = 5 aLong = 3 Process finished with exit code 0

9.2 blockingSubscribe测试用例分析

上面只是列举了最初的两个重载blockingSubscribe操作符,而且两个的运行结果看不出来有什么不同,实际上这里体现的是在当前线程订阅。与forEach类似,这个操作符后面会讲到。

9.3 实用场景

目录1 blockingFirst2blockingForEach3blockingIterable4 blockingLast5 blockingLatest6blockingMostRecent7blockingNext8blockingSingle9 blockingSubscribe1 blockingFirstT blockin...
一、阻塞操作符列表 Blocking Observable已经在 Rx java 2中去掉了,集成到了Observable中。官方说明不同文档: https://github.com/ReactiveX/ Rx Java /wiki/What’s-different-in-2.0 可以看这里: http://reactivex.io/ Rx Java /2.x/ java doc/io/reactivex/Observa
错误日志: reactor.core.Exceptions$BubblingException: java .lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking , which is not supported in thread reactor-http-epoll-4 at reactor.core.Exceptions.bubble(Exceptions. java :170) at reactor.cor
概述过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。DebounceDebounce会过滤掉发射速率过快的数据项,相当于限流,但是需要注意的是debounce过滤掉的数据会被丢弃掉。 如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。 Rx Java 将这个操作符实现为throttleWithTimeout和debou
Synchronized/Lock : 通过线程阻塞的方式等待结果返回,代码写起来比较直观 Callbacks:异步方法没有返回值,但需要额外的回调参数(lambda或匿名类),在结果可用时调用它们。 Futures:异步方法立即返回Future 。异步线程计算任务结果,Future对象包装对它的访问。该值不会立即可... # Database settings datasource.driver=org.h2.Driver datasource.url=jdbc:h2:mem: flowable ;DB_CLOSE_DELAY=-1 datasource.username=sa datasource.password= 这些配置信息指定了 Flowable 默认使用的 H2 数据库的驱动、URL、用户名和密码。在这里,`jdbc:h2:mem: flowable ;DB_CLOSE_DELAY=-1` 表示使用 H2 内存数据库,并且在最后一个连接关闭时不关闭数据库。 如果你想使用其他的 H2 数据库配置,你可以将这些配置信息复制到自己的配置文件中,并进行相应的修改。注意,如果你在自己的配置文件中使用了与默认配置相同的配置项,那么自己的配置会覆盖默认配置。同时,你还需要在启动 Flowable 时指定自己的配置文件,例如: java -jar flowable -6.5.0.jar --spring.config.location=file:/path/to/your/config.properties 这样, Flowable 就会使用你指定的配置文件来配置 H2 数据库。