1 blockingFirst
T
|
blockingFirst()
返回此Flowable发出的第一个项,如果它没有发出任何项,则抛出NoSuchElementException。
|
T
|
blockingFirst(T defaultItem)
返回此Flowable发出的第一个项,如果它不发出任何项,则返回默认值。
|
1.1 blockingFirst图解
1.2 blockingFirst测试用例
@Test
public void doBlockingFirst() {
System.out.println("######doAny#####");
Integer result = Flowable.just(10, 2, 3, 4, 5).blockingFirst();
System.out.println(result);
######doAny#####
1.3 blockingFirst分析说明
blockingFirst返回发射的第一个item
1.4 实用场景
2 blockingForEach
void
|
blockingForEach(Consumer<? super
T
> onNext)
以阻塞方式消耗上游Flowable并使用当前线程上的每个上游项调用给定的Consumer,直到上游终止。
|
2.1 blockingForEach测试用例
@Test
public void doBlockingForEach() {
System.out.println("######doBlockingForEach#####");
Flowable.just(10, 2, 3, 4, 5).blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer result) throws Exception {
System.out.println("blocking result = " + result);
######doBlockingForEach#####
blocking result = 10
blocking result = 2
blocking result = 3
blocking result = 4
blocking result = 5
2.2 blockingForEach说明
blockingForEach会使用指定的Consumer消费Flowable发出的每一个项目
2.4 实用场景
3 blockingIterable
Iterable<T>
|
blockingIterable()
将此Flowable转换为Iterable。
|
Iterable<T>
|
blockingIterable(int bufferSize)
将此Flowable转换为Iterable,限制
bufferSize
|
3.1 blockingIterable图解
3.2 blockingIterable测试用例
@Test
public void blockingIterable() {
System.out.println("######blocksingIterable#####");
Iterable<Integer> iterable = Flowable.just(10, 9, 8, 7).blockingIterable();
for(Integer aIterable :iterable) {
System.out.println("value = " + aIterable);
######blockingIterable#####
value = 10
value = 9
value = 8
value = 7
3.3 实用场景
4 blockingLast
T
|
blockingLast
()
返回此Flowable发出的最后一项,如果此Flowable没有发出任何项,则抛出NoSuchElementException。
|
T
|
blockingLast
(
T
defaultItem)
返回此Flowable发出的最后一项,如果它没有发出任何项,则返回默认值。
|
|
|
4.1 blockingLast图解
4.2 blockingLast测试用例
@Test
public void blockingLast() {
System.out.println("######blockingLast#####");
Integer last = Flowable.just(10, 9, 8, 7).blockingLast();
System.out.println("Last value = "+ last);
######blockingLast#####
Last value = 7
4.3 实用场景
5 blockingLatest
Iterable<T> | blockingLatest() 返回一个Iterable,它将会阻塞,直到此Flowable发出的新项目,并将其返回。 |
5.1 blockingLatest图解
5.2 blockingLatest测试用例
@Test
public void blockingLatest() {
System.out.println("######blockingLatest#####");
Iterable<Integer> latest = Flowable.just(10, 9, 8, 7).delay(1, TimeUnit.SECONDS).blockingLatest();
if(latest.iterator().hasNext()) {
System.out.println("Latest value = "+ latest.iterator().next());
} else {
System.out.println("Latest value is null " );
######blockingLatest#####
Latest value = 10
5.3 实用场景
6 blockingMostRecent
6.1 blockingMostRecent图解
6.2 blockingMostRecent测试用例
@Test
public void blockingMostRecent() {
System.out.println("######blockingMostRecent#####");
Flowable publisher = Flowable.just("我说","123","木头人").delay(100,TimeUnit.MILLISECONDS);
Iterable<String> mostRecent = publisher.blockingMostRecent("null");
mostRecent.forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String mostRecent) {
System.out.println("mostRecent = " + mostRecent);
######blockingMostRecent#####
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = 我说
mostRecent = 我说
mostRecent = 我说
mostRecent = 123
mostRecent = 123
mostRecent = 123
mostRecent = 木头人
mostRecent = 木头人
mostRecent = 木头人
6.3 blockingMostRecent测试用例分析
blockingMostRecent这个操作符会阻塞等待Flowable发射项目,测试用例中反复打印null就是阻塞等待的证据,直到有新的值发射
6.4 实用场景
7 blockingNext
Iterable<T> | blockingNext() 返回一个Iterable,在每次迭代时阻塞,直到此Flowable发出一个新项,然后Iterable返回该项 |
7.1 blockingNext图解
源码中注释里面指向说明文档图,与takeLast一样
7.2 blockingNext测试用例
@Test
public void blockingNext() {
System.out.println("######blockingNext#####");
Flowable publisher = Flowable.just("我说","123","木头人").delay(1,TimeUnit.SECONDS);
publisher.subscribe(new Consumer<String>() {
@Override
public void accept(String value) throws Exception {
System.out.println("value = " + value);
Iterable<String> next = publisher.blockingNext();
next.forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String next) {
System.out.println("next = " + next);
测试1结果
######blockingNext#####
value = 我说
value = 123
value = 木头人
next = 我说
@Test
public void blockingNext1() {
System.out.println("######blockingNext#####");
Flowable publisher = Flowable.interval(0, 1, TimeUnit.SECONDS);
Iterable<Long> next = publisher.blockingNext();
publisher.subscribe(new Consumer<Long>() {
@Override
public void accept(Long value) throws Exception {
System.out.println("value = " + value);
next.forEach(new java.util.function.Consumer<Long>() {
@Override
public void accept(Long next) {
System.out.println("next = " + next);
测试二结果
######blockingNext#####
value = 0
next = 0
value = 1
next = 1
value = 2
next = 2
...继续
7.3 blockingNext测试用例分析
Flowable实例在使用blockingNext操作符时会返回一个Iterable,该Iterable在每次迭代时阻塞,直到Flowable发出一个新项,然后Iterable返回该项,上面的测试1只是发射了三个值后就结束了,所以只打印出第一个值便结束了,剩下的还来不及返回便结束了;测试2,每隔1s发射一个长整型的值,从0开始,每发射一次+1,这时候next也会跟着一直打印。其实这说明了一个问题,也就是说
blockingNext操作符是对
Flowable实例的发射做了阻塞式的监听,在Flowable没有结束前就会一直阻塞等待Flowable的发射消息,发射一次返回一次。
7.4 实用场景
8 blockingSingle
T
|
blockingSingle
()
如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目,如果它发出多个项目,则抛出IllegalArgumentException。
源码的注释是:If this {@code Flowable} completes after emitting a single item, return that item, otherwise throw a {@code NoSuchElementException}.
但是NoSuchElementException应该写错了错误的,在做测试时候抛出的异常是IllegalArgumentException,下面重载接口是正确的
|
T
|
blockingSingle
(
T
defaultItem)
如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目; 如果它发出多个项目,则抛出IllegalArgumentException; 如果它没有发出任何项目,则返回默认值。
|
8.1 blockingSingle图解
图解地址:
http://reactivex.io/documentation/operators/first.html
源码中指向的图解就是这个,看起来似乎和某些操作符使用了相同的,这说明这些操作符表达的是一个类似操作
8.2 blockingSingle测试用例
@Test
public void blockingSingle() {
System.out.println("######blockingSingle#####");
Flowable<Long> publisher = Flowable.just(1L, 2L);
Long singleItem = publisher.blockingSingle();
System.out.println("singleItem = " + singleItem);
测试1结果:
######blockingNext#####
java.lang.IllegalArgumentException: Sequence contains more than one element!
at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onNext(FlowableSingleSingle.java:82)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:84)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Process finished with exit code 255
@Test
public void blockingSingle2() {
System.out.println("######blockingSingle2#####");
Flowable<Long> publisher = Flowable.just(1L);
Long singleItem = publisher.blockingSingle();
System.out.println("singleItem = " + singleItem);
测试2结果
######blockingSingle2#####
singleItem = 1
Process finished with exit code 0
@Test
public void blockingSingle3() {
System.out.println("######blockingSingle2#####");
Flowable<Long> publisher = Flowable.empty();
Long singleItem = publisher.blockingSingle(-1L);
System.out.println("singleItem = " + singleItem);
测试3结果
######blockingSingle2#####
singleItem = -1
Process finished with exit code 0
8.3 blockingSingle测试用例分析
如果Flowable在发射完单个项目便结束了,那么blockingSingle操作符会返回这个发射的结果,如果Flowable发射项目在2个一个上会报错
(java.lang.IllegalArgumentException: Sequence contains more than one element!)
测试3是对其重载方法做测试,通过 Flowable.empty()不会发射项目,blockingSingle操作符会返回默认的值-1
8.4 实用场景
9 blockingSubscribe
void
|
blockingSubscribe()
Runs the source Flowable to a terminal event, ignoring any values and rethrowing any exception.
将观察源运行到结束为止,忽略任何值并重新抛出任何异常。
|
void
|
blockingSubscribe(Consumer<? super T> onNext)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Consumer<? super T> onNext, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。
|
void
|
blockingSubscribe(Subscriber<? super T> subscriber)
订阅源并在当前线程上调用Subscriber方法。
|
9.1 blockingSubscribe测试用例
@Test
public void blockingSubscribe() {
System.out.println("######blockingSubscribe#####");
Flowable<Long> source = Flowable.just(1L, 2L, 3L);
source.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
source.blockingSubscribe();
测试1结果
######blockingSubscribe#####
aLong = 1
aLong = 2
aLong = 3
Process finished with exit code 0
@Test
public void blockingSubscribe2() {
System.out.println("######blockingSubscribe#####");
Flowable<Long> publisher = Flowable.just(4L, 5L, 3L);
publisher.blockingSubscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
测试2结果
######blockingSubscribe#####
aLong = 4
aLong = 5
aLong = 3
Process finished with exit code 0
9.2 blockingSubscribe测试用例分析
上面只是列举了最初的两个重载blockingSubscribe操作符,而且两个的运行结果看不出来有什么不同,实际上这里体现的是在当前线程订阅。与forEach类似,这个操作符后面会讲到。
9.3 实用场景
目录1 blockingFirst2blockingForEach3blockingIterable4 blockingLast5 blockingLatest6blockingMostRecent7blockingNext8blockingSingle9 blockingSubscribe1 blockingFirstT blockin...
一、阻塞操作符列表
Blocking
Observable已经在
Rx
java
2中去掉了,集成到了Observable中。官方说明不同文档: https://github.com/ReactiveX/
Rx
Java
/wiki/What’s-different-in-2.0
可以看这里:
http://reactivex.io/
Rx
Java
/2.x/
java
doc/io/reactivex/Observa
错误日志:
reactor.core.Exceptions$BubblingException:
java
.lang.IllegalStateException: block()/blockFirst()/blockLast() are
blocking
, which is not supported in thread reactor-http-epoll-4
at reactor.core.Exceptions.bubble(Exceptions.
java
:170)
at reactor.cor
概述过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。DebounceDebounce会过滤掉发射速率过快的数据项,相当于限流,但是需要注意的是debounce过滤掉的数据会被丢弃掉。
如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
Rx
Java
将这个操作符实现为throttleWithTimeout和debou
Synchronized/Lock : 通过线程阻塞的方式等待结果返回,代码写起来比较直观
Callbacks:异步方法没有返回值,但需要额外的回调参数(lambda或匿名类),在结果可用时调用它们。
Futures:异步方法立即返回Future 。异步线程计算任务结果,Future对象包装对它的访问。该值不会立即可...
# Database settings
datasource.driver=org.h2.Driver
datasource.url=jdbc:h2:mem:
flowable
;DB_CLOSE_DELAY=-1
datasource.username=sa
datasource.password=
这些配置信息指定了
Flowable
默认使用的 H2 数据库的驱动、URL、用户名和密码。在这里,`jdbc:h2:mem:
flowable
;DB_CLOSE_DELAY=-1` 表示使用 H2 内存数据库,并且在最后一个连接关闭时不关闭数据库。
如果你想使用其他的 H2 数据库配置,你可以将这些配置信息复制到自己的配置文件中,并进行相应的修改。注意,如果你在自己的配置文件中使用了与默认配置相同的配置项,那么自己的配置会覆盖默认配置。同时,你还需要在启动
Flowable
时指定自己的配置文件,例如:
java
-jar
flowable
-6.5.0.jar --spring.config.location=file:/path/to/your/config.properties
这样,
Flowable
就会使用你指定的配置文件来配置 H2 数据库。