只有当原始的Observable中的每一个都发射了 一条数据时 zip 才发射数据。接受一到九个参数
Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.subscribeOn(Schedulers.newThread())
Observable<Long> observable2 = Observable.interval(200, TimeUnit.MILLISECONDS)
.take(4)
.subscribeOn(Schedulers.newThread())
Observable.zip(observable1, observable2, (aLong, aLong2) -> {
System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t")
return aLong + aLong2
}).subscribe(o -> System.out.println("===>" + o + "\t"))
aLong:0 aLong2:0===>0
aLong:1 aLong2:1===>2
aLong:2 aLong2:2===>4
zipWith
zip的非静态写法,总是接受两个参数,第一个参数是一个Observable或者一个Iterable。
observable1.zipWith( observable2, (aLong, aLong2) -> {
System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t");
return aLong + aLong2;
}).subscribe(o -> System.out.println("===>" + o + "\t"));
merge(静态方法)
根据时间线 合并多个observer
Observable<Long> ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.subscribeOn(Schedulers.newThread())
Observable<Long> ob2 = Observable.interval(50, TimeUnit.MILLISECONDS)
.take(3)
.map(aLong -> aLong + 10)
.subscribeOn(Schedulers.newThread())
Observable.merge(ob1, ob2)
.subscribe(o -> System.out.print(o + "\t"))
日志结果:可以见出是根据时间线合并
10 10 0 0 11 11 12 12 1 1 2 2
mergeWith
merge非静态写法
ob1.mergeWith(ob2)
.subscribe(o -> System.out.print( o + "\t"))
combineLatest(静态方法)
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值,它接受二到九个Observable作为参数 或者单 个Observables列表作为参数
Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(4)
.subscribeOn(Schedulers.newThread())
Observable<Long> observable2 = Observable.interval(200, TimeUnit.MILLISECONDS)
.take(5)
.subscribeOn(Schedulers.newThread())
Observable.combineLatest(observable1, observable2, (aLong, aLong2) -> {
System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t")
return aLong + aLong2
}).subscribe(o -> System.out.println("===>" + o + "\t"))
aLong:1 aLong2:0 ===>1
aLong:2 aLong2:0 ===>2
aLong:3 aLong2:0 ===>3
aLong:3 aLong2:1 ===>4
aLong:3 aLong2:2 ===>5
aLong:3 aLong2:3 ===>6
aLong:3 aLong2:4 ===>7
withLatestFrom
类似zip ,但是只在单个原始Observable发射了一条数据时才发射数据,而不是两个都发
但是注意 如果没有合并元素 既辅助Observable一次都没发射的时候 是不发射数据的
Observable<Long> observable2 = Observable.interval(150, TimeUnit.MILLISECONDS)
.take(4)
.subscribeOn(Schedulers.newThread())
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.subscribeOn(Schedulers.newThread())
.withLatestFrom(observable2, (aLong, aLong2) -> {
System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2 + "\t")
return aLong + aLong2
.subscribe(o -> System.out.println("===>" + o + "\t"))
明明原始take是3为啥不是三条log呢 因为原始的发送0的时候 ,辅助Observable还没发送过数据
aLong:1 aLong2:0 ===>1
aLong:2 aLong2:1 ===>3
switchMap
和flatMap类似,不同的是当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
Observable.interval(500, TimeUnit.MILLISECONDS)
.take(3)
.doOnNext(aLong -> System.out.println())
.switchMap(aLong -> Observable.intervalRange(aLong * 10, 3,
0, 300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread()))
.subscribe(aLong -> System.out.print(aLong+"\t"))
解析:因为发送2的时候 intervalRange发送第三条数据的时候已经是600ms既 500ms的时候原始数据发送了。导致取消订阅前一个Observable
所以 2 ,12没有发送 但是最后的22发送了 因为原始数据没有新发送的了
// 日志结果
// 0 1
// 10 11
// 20 21 22
// 而不是
// 0 1 2
// 10 11 12
// 20 21 22
startWith
是concat()的对应部分,在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列
Observable.just("old")
.startWith("Start")
.startWith("Start2")
.startWith(Observable.just("Other Observable"))
.startWith(Arrays.asList("from Iterable"))
.startWithArray("from Array", "from Array2")
.subscribe(s -> System.out.println(s));
from Array
from Array2
from Iterable
Other Observable
Start2
Start
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了。一条数据,就结合两个Observable发射的数据
<!-- 此demo 好使但是未让能理解透彻 仅仅想测试能结果的任用 想明白的话 此demo无效 -->
Observable.intervalRange(10, 4, 0, 300, TimeUnit.MILLISECONDS)
.join(Observable.interval(100, TimeUnit.MILLISECONDS)
.take(7)
, aLong -> {
System.out.println("开始收集:"+aLong)
return Observable.just(aLong)
, aLong -> Observable.timer(200, TimeUnit.MILLISECONDS)
, (aLong, aLong2) -> {
System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2 + "\t")
return aLong + aLong2
.subscribe(aLong -> System.out.println(aLong))
判定是否Observable发射的所有数据都满足某个条件
Observable.just(2, 3, 4)
.all(integer -> integer > 3)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
日志:false
给定多个Observable,只让第一个发射数据的Observable发射全部数据
- ambArray(静态方法):根据测试结果这个静态方法发射的最后一个
Observable.ambArray(
Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
, Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS)
, Observable.intervalRange(20, 3, 100, 100, TimeUnit.MILLISECONDS)
.doOnComplete(() -> System.out.println("Complete"))
.subscribe(aLong -> System.out.println(aLong))
20 21 22 Complete
Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
.ambWith(Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS))
.doOnComplete(() -> System.out.println("Complete"))
.subscribe(aLong -> System.out.println(aLong))
0 1 2 Complete
contains
判定一个Observable是否发射一个特定的值
Observable.just(2, 3, 4)
.contains(2)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean))
switchIfEmpty
如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable
Observable.empty()
.switchIfEmpty(Observable.just(2, 3, 4))
.subscribe(o -> System.out.println("===>" + o + "\t"))
defaultIfEmpty
发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值,内部调用的switchIfEmpty。
Observable.empty()
.defaultIfEmpty(1)
.subscribe(o -> System.out.println("===>" + o + "\t"))
sequenceEqual
判定两个Observables是否发射相同的数据序列。(数据,发射顺序,终止状态)
Observable.sequenceEqual(
Observable.just(2, 3, 4)
, Observable.just(2, 3, 4))
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean))
<!-- 它还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。 -->
Observable.sequenceEqual(
Observable.just(2, 3, 4)
, Observable.just(2, 3, 4)
, (integer, integer2) -> integer + 1 == integer2)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean))
skipUntil
丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据
Observable.intervalRange(30, 20, 500, 100, TimeUnit.MILLISECONDS)
.skipUntil(Observable.timer(1000, TimeUnit.MILLISECONDS))
.doOnNext(integer -> System.out.println(integer))
//此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
.blockingSubscribe()
skipWhile
丢弃Observable发射的数据,直到一个指定的条件不成立
Observable.just(1,2,3,4)
//从2开始 因为2条件不成立
.skipWhile(aLong -> aLong==1)
.doOnNext(integer -> System.out.println(integer))
//此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
.blockingSubscribe()
takeUntil
当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据
Observable.just(2,3,4,5)
.takeUntil(integer -> integer<=4)
.subscribe(o -> System.out.print(o + "\t"));//2,3,4
Observable.intervalRange(30, 20, 500, 100, TimeUnit.MILLISECONDS)
.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS))
.doOnNext(integer -> System.out.println(integer))
.doOnComplete(() -> System.out.println("Complete"))
//此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
.blockingSubscribe();
takeWhile
发射Observable发射的数据,直到一个指定的条件不成立
Observable.just(2,3,4,5)
.takeWhile(integer ->integer<=4 )
.subscribe(o -> System.out.print(o + "\t"))
onErrorReturn
让Observable遇到错误时发射一个特殊的项并且正常终止
<!-- 遇到错误处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
.onErrorReturnItem("hei")
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"))
===>hei
===>complete
<!-- 遇到错误不处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
.onErrorReturn(throwable -> {
System.out.println("错误信息:" + throwable.getMessage())
return throwable
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"))
错误信息:我擦 空啊
===>java.lang.Throwable: 我擦 空啊
===>complete
resumeNext
让Observable在遇到错误时开始发射第二个Observable的数据序列
- onErrorResumeNext:可以处理所有的错误
Observable.error(new Throwable("我擦 空啊"))
.onErrorResumeNext(throwable -> {
System.out.println("错误信息:" + throwable.getMessage());
return Observable.range(0, 3);
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable"+ "\t")
, () -> System.out.print("===>complete"+ "\t"));
错误信息:我擦 空啊
===>0 ===>1 ===>2 ===>complete
- onExceptionResumeNext:只能处理异常
Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用 的Observable。
Observable.error(new Throwable("我擦 空啊"))
.onExceptionResumeNext(observer -> Observable.range(0, 3))
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"));
===>throwable
retry
如果原始Observable遇到错误,重新订阅它期望它能正常终止
Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("hehe"));
.retry(2)
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable\t")
, () -> System.out.print("===>complete\t"));
===>1 ===>2 ===>1 ===>2 ===>1 ===>2 ===>throwable
- 变体Predicate 条件判定 如果返回 true retry,false 放弃 retry
Observable.create(e -> {
e.onNext(1)
e.onNext(2)
e.onError(new Throwable("hehe"))
.retry(throwable -> throwable.getMessage().equals("hehe1"))
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable\t")
, () -> System.out.print("===>complete\t"))
===>1 ===>2 ===>throwable
retryWhen
需要一个Observable 通过判断 throwableObservable,Observable发射一个数据 就重新订阅,发射的是 onError 通知,它就将这个通知传递给观察者然后终止。
Observable.just(1, "2", 3)
.cast(Integer.class)
.retryWhen(throwableObservable -> Observable.timer(1, TimeUnit.SECONDS))
.retryWhen(throwableObservable -> Observable.interval(1, TimeUnit.SECONDS)
.take(3))
.subscribe(o -> System.out.println("retryWhen 1===>" + o + "\t")
, throwable -> System.out.println("retryWhen 1===>throwable")
, () -> System.out.println("retryWhen 1===>complete"));
Observable.just(1, "2", 3)
.cast(Integer.class)
.retryWhen(throwableObservable -> {
return throwableObservable.switchMap(throwable -> {
if (throwable instanceof IllegalArgumentException)
return Observable.just(throwable);
// else{
// PublishSubject<Object> pb = PublishSubject.create();
// pb .onError(throwable);
// return pb;
// }
//方法泛型
return Observable.<Object>error(throwable);
// return Observable.just(1).cast(String.class);
.subscribe(o -> System.out.println("retryWhen 2===>" + o + "\t")
, throwable -> System.out.println("retryWhen 2===>throwable")
, () -> System.out.println("retryWhen 2===>complete"));
retryWhen 2===>1
retryWhen 2===>throwable
toList
Observable.just(1, 2, 3)
.toList().blockingGet()
.forEach(aLong -> System.out.println(aLong))
toSortList
Observable.just(5, 2, 3)
.toSortedList()
.blockingGet()
.forEach(integer -> System.out.println(integer))
toMap
Map<String, Integer> map = Observable.just(5, 2, 3)
.toMap(integer -> integer + "_"
, integer -> integer + 10
, () -> new HashMap<>())
.blockingGet();
toFuture
这个操作符将Observable转换为一个返 回单个数据项的 Future 带有返回值的任务
如果原始Observable发射多个数据项, Future 会收到1个 IllegalArgumentException
如果原始Observable没有发射任何数据, Future 会收到一 个 NoSuchElementException
如果你想将发射多个数据项的Observable转换为 Future ,可以这样 用: myObservable.toList().toFuture()
Observable.just(1, 2, 3)
.toList()//转换成Single<List<T>> 这样就变成一个数据了
.toFuture()
.get()
.forEach(integer -> System.out.println(integer))
blockingSubscribe
Observable.just(1, 2, 3)
.blockingSubscribe(integer -> System.out.println(integer))
blockingForEach
对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(aLong -> {
if (aLong == 10)
throw new RuntimeException()
}).onErrorReturnItem(-1L)
.blockingForEach(aLong -> System.out.println(aLong))
blockingIterable
Observable.just(1, 2, 3)
.blockingIterable()
// .blockingIterable(5)
.forEach(aLong -> System.out.println("aLong:" + aLong))
blockingFirst
Observable.empty()
.blockingFirst(-1));
blockingLast
Observable.just(1,2,3)
// .blockingLast()
//带默认值版本
.blockingLast(-1))
blockingMostRecent
返回一个总是返回Observable最近发射的数据的Iterable,类似于while的感觉
Iterable<Long> c = Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(aLong -> {
if (aLong == 10)
throw new RuntimeException()
}).onErrorReturnItem(-1L)
.blockingMostRecent(-3L)
for (Long aLong : c) {
System.out.println("aLong:" + aLong)
日志很长 可以自己一试变知
blockingSingle
终止时只发射了一个值,返回那个值
empty 无默认值 报错, 默认值的话显示默认值
多个值的话 有无默认值都报错
System.out.println("emit 1 value:" + Observable.just(1).blockingSingle())
System.out.println("default empty single:" + Observable.empty().blockingSingle(-1))
System.out.println("default emit 1 value:" + Observable.just(1).blockingSingle(-1))
try {
System.out.println("empty single:" + Observable.empty().blockingSingle())
System.out.println("emit many value:" + Observable.just(1, 2).blockingSingle())
System.out.println("default emit many value:" + Observable.just(1, 2)
.blockingSingle(-1))
} catch (Exception e) {
e.printStackTrace()
emit 1 value:1
default empty single:-1
default emit 1 value:1
java.util.NoSuchElementException
compose
有多个 Observable ,并且他们都需要应用一组相同的 变换
<!-- 用一个工具类去写 这样符合单一职责 -->
public class RxComposes {
public static <T> ObservableTransformer<T, T> applyObservableAsync() {
return upstream -> upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable.empty()
.compose(RxComposes.applyObservableAsync())
.subscribe(integer -> System.out.println("ob3:" + integer));
ConnectableObservable
可连接的Observable在 被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始用这种方法,你可以 等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。即使没有任何订阅者订阅它,你也可以使用 connect 让他发射
replay(Observable的方法)
每次订阅 都对单个订阅的重复播放一边
bufferSize:对源发射队列的缓存数量, 从而对新订阅的进行发射
Observable的方法 返回是ConnectableObservable
切记要让ConnectableObservable具有重播的能力,必须Obserable的时候调用replay,而不是ConnectableObservable 的时候调用replay
//this is OK,too!
ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
//类似 publish直接转成 ConnectableObservable 切记要重复播放的话必须Obserable的时候调用replay
//而不是ConnectableObservable 的时候调用replay 所以 .publish().replay()则无效
.replay(3);//重复播放的 是1 2 3
// .replay(2);//重复播放的 是 2 3
co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.connect();//此时开始发射数据 不同与 refCount 只发送一次
co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
replay(3)日志:只能缓存原始队列的两个【1,2,3】
订阅1:1 2 3
订阅2:1 2 3
订阅3:1 2 3
replay(2)日志:只能缓存原始队列的两个【2,3】
订阅1:1 2 3
订阅2: 2 3
订阅3: 2 3
publish(Observable的方法)
将普通的Observable转换为可连接的Observable
ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
.publish()
co.subscribe(integer -> System.out.println("订阅1:" + integer))
co.subscribe(integer -> System.out.println("订阅2:" + integer))
co.subscribe(integer -> System.out.println("订阅3:" + integer))
co.connect()
- refCount(ConnectableObservable的方法): 操作符把从一个可连接的Observable连接和断开的过程自动化了, 就像reply的感觉式样 每次订阅 都对单个订阅的重复播放一边
Observable<Integer> co = Observable.just(1, 2, 3)
.publish()
//类似于reply 跟时间线有关 订阅开始就开始发送
.refCount();
co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
Observable.timer(300, TimeUnit.MILLISECONDS)
.doOnComplete(() -> {
co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
}).blockingSubscribe();
订阅1:1 2 3
订阅2:1 2 3
订阅3:1 2 3
目录目录合并操作符zip静态方法zipWithmerge静态方法mergeWithcombineLatest静态方法withLatestFromswitchMapstartWithjoin条件操作allambcontainsswitchIfEmptydefaultIfEmptysequenceEqualskipUntilskipWhiletakeUntilt
Obersvable的种类
Observable 表示可以发射0-N个Item信号,当收到Complete信号或者Error信号结束,所以这里就有一种可能是成功发射了若干个然后失败,特别注意这点
Flowable 从之前的 Observable 中分离出来的,有Observable 的特性的同时支持背压
Single 表示只会发送一个信号或者一个Error信号就结束的Observable
May...
zip函数允许你传入多个请求,然后合并出另外的结果传出来,普通的用法就不多说了,网上一堆介绍的
然后做项目时有个疑问点,Observable.zip如果传入一个列表,合并列表里的所有请求的时候,请求回来的顺序是未知的,返回回来的数组是否会按传入时的顺序返回回来呢。于是做了以下实验:
Integer[] skuSerials={1,2,3,4,5};
ArrayList<Observable<Integer>> requestList=new Array
static final ObservableTransformer schedulersTransformer = new ObservableTransformer() {
@Override
public ObservableSo...
一、阻塞操作符列表BlockingObservable已经在Rxjava2中去掉了,集成到了Observable中。官方说明不同文档: https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
可以看这里:
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observa
* 把多个Observable合并后,并且把这些Observable的数据进行转换再发射出去。转换之后的数据数目由最短数据长度的那个Observable决定。发射完最终会自动调用观察者的onComplete方法()
* 如以下代码: 数据长度为6的observable1和数据长度为4的observable2进行合并转换后,观察者只接收到4个数据
Observable observable1 = Observabl...
RxJava中blockingSubscribe的作用
RxJava中blockingSubscribe的作用是阻塞主线程,直到前面流式代码中的工作完成。举两个例子就可以看出其中的区别了。
##使用blockingSubscribe
Log.d("======", "Before blockingSubscribe");
Log.d("======", "Before Thread: " + Thread.currentThread());
Flowable.range(1, 6)