错误处理操作符 帮助从 Observable发出的错误通知中 恢复或做出反应.

例如我们可以:

  • 接收错误并切换到一个备份可观察者以继续序列
  • 接收错误并发出一个默认项
  • 接收错误并且立即尝试重启这个失败的Observable
  • 接收错误并且在一定时间间隔后尝试重启这个失败的Observable

错误处理操作符分类

错误处理操作符分为两类

  • Catch ― 拦截原始Observable的 onError 通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止.

  • Retry — 当原始Observable 发出一个错误通知,不会将通知传递给 onError ,而是重新订阅以期望他能正常完成.

错误处理操作符使用示例

Catch

onErrorComplete

拦截源Observable错误通知,并将其转换为完成通知发出

也可指定一个Predicate,用以控制何时对错误通知转换成完成通知,何时不转换.

    private void onErrorComplete() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                }).onErrorComplete()
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    @Override
                    public void onNext(@NonNull String s) {
                        Log.i(TAG, "onErrorComplete onNext: " + s);
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.i(TAG, "onErrorComplete onError: " + e.getMessage());
                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onErrorComplete onComplete" );
                });
27799-27799/com.sky.rxjava I/sky>>>: onErrorComplete onNext: A
27799-27799/com.sky.rxjava I/sky>>>: onErrorComplete onNext: B
27799-27799/com.sky.rxjava I/sky>>>: onErrorComplete onComplete
onErrorResumeNext

拦截源Observable错误通知,转换为一个Observable 发出一系列项.

    private void onErrorResumeNext() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                }).onErrorResumeNext(new Function<Throwable, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Throwable throwable) throws Throwable {
                        return Observable.just("X", "Y", "Z");
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Throwable {
                        Log.i(TAG, "onErrorResumeNext: " + s);
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                        Log.i(TAG, "onErrorResumeNext onError: " + throwable.getMessage());
                });
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onNext: A
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onNext: B
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onNext: X
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onNext: Y
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onNext: Z
29457-29457/com.sky.rxjava I/sky>>>: onErrorResumeNext onComplete
onErrorReturn

指示响应式类型在遇到错误时发出指定Function返回的项.

    private void onErrorReturn() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                }).onErrorReturn(new Function<Throwable, String>() {
            		//通过此函数,将错误通知转换为想要的类型
                    @Override
                    public String apply(Throwable throwable) throws Throwable {
                        return "**Sky**";
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    @Override
                    public void onNext(@NonNull String s) {
                        Log.i(TAG, "onErrorReturn onNext: " + s);
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.i(TAG, "onErrorReturn onError: " + e.getMessage());
                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onErrorReturn onComplete" );
                });
5463-5463/com.sky.rxjava I/sky>>>: onErrorReturn onNext: A
5463-5463/com.sky.rxjava I/sky>>>: onErrorReturn onNext: B
5463-5463/com.sky.rxjava I/sky>>>: onErrorReturn onNext: **Sky**
5463-5463/com.sky.rxjava I/sky>>>: onErrorReturn onComplete
onErrorReturnItem

指示反应类型在遇到错误时发出特定项

    private void onErrorReturnItem() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                }).onErrorReturnItem("~sky~")//收到错误通知时,转换为发出 此项,并发出完成通知
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    @Override
                    public void onNext(@NonNull String s) {
                        Log.i(TAG, "onErrorReturnItem onNext: " + s);
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.i(TAG, "onErrorReturnItem onError: " + e.getMessage());
                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onErrorReturnItem onComplete" );
                });
6151-6151/com.sky.rxjava I/sky>>>: onErrorReturnItem onNext: A
6151-6151/com.sky.rxjava I/sky>>>: onErrorReturnItem onNext: B
6151-6151/com.sky.rxjava I/sky>>>: onErrorReturnItem onNext: ~sky~
6151-6151/com.sky.rxjava I/sky>>>: onErrorReturnItem onComplete

Retry

retry

不会将原始Observable的onError通知传递给观察者,它会重新订阅这个Observable,再给它一次机会无错误地完成它的数据序列。Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复.

    private void retry() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                }).retry()
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Throwable {
                        Log.i(TAG, "retry: " + s);
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                        Log.i(TAG, "retry onError: " + throwable.getMessage());
                });
7651-7651/com.sky.rxjava I/sky>>>: retry: A
7651-7651/com.sky.rxjava I/sky>>>: retry: B
7651-7651/com.sky.rxjava I/sky>>>: retry: A
7651-7651/com.sky.rxjava I/sky>>>: retry: B
7651-7651/com.sky.rxjava I/sky>>>: retry: A
7651-7651/com.sky.rxjava I/sky>>>: retry: B
7651-7651/com.sky.rxjava I/sky>>>: retry: A
    ......

retry()还支持传递long类型的参数,表示重试的次数,如果在重试次数用尽,还没成功结束,则将最后一次的onError发出.

retry()也支持传递一个 Predicate用以控制某个错误通知 是否进行重新订阅.

retryUntil

通过指定的函数接口,判断是否对错误通知进行重试,直到这个函数接口返回true.就不再重新订阅,当函数接口返回true时,还未成功结束,就将最后一次订阅的onError()发出

    private void retryUntil() {
        final int[] i = {0};
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                .retryUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Throwable {
                        //每次接收到错误时,如果重新订阅次数小于3次,就返回false 重新订阅,并且次数+1
                        //大于3次时,就返回true不再重新订阅,并将最后一次onError发出
                        if (i[0] > 3) {
                            return true;
                        i[0]++;
                        return false;
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Throwable {
                        Log.i(TAG, "retryUntil: " + s);
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                        Log.i(TAG, "retryUntil onError: " + throwable.getMessage());
                });
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: A
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: B
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: A
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: B
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: A
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: B
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: A
8827-8827/com.sky.rxjava I/sky>>>: retryUntil: B
8827-8827/com.sky.rxjava I/sky>>>: retryUntil onError: 出错了
retryWhen

将所有的错误通知传递给一个Observable,以确定是否重新订阅,以及重新订阅的时间.

通过Function 函数接口,返回一个Observable,这个Observable有几项,就表示需要重试几次,而每项发出的时间,就是重试的时间.

    private void retryWhen() {
        Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onError(new Throwable("出错了"));
                        emitter.onNext("C");
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<Long>>() {
                    @Override
                    public ObservableSource<Long> apply(Observable<Throwable> throwableObservable) throws Throwable {
                      	//这里从0开始 每隔2秒发送一项,一共法发送3项,最后看输出结果,重试了3次,
                        //加上原来一次,一共订阅4次,并且重试每次间隔就是我们设置的2秒
                        //这里的 throwableObservable 里面包含我们所有的OnError信息,原始的和重试的.可以使用一个观察者订阅它就可以观察它的内容
                        return Observable.intervalRange(0,3,0,2, TimeUnit.SECONDS);
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    @Override
                    public void onNext(@NonNull String s) {
                        Log.i(TAG, "retryWhen onNext: " + s);
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.i(TAG, "retryWhen onError: " + e.getMessage());
                    @Override
                    public void onComplete() {
                        Log.i(TAG, "retryWhen onComplete" );
                });
RxJava是响应式编程(Reactive Extensions)在JVM平台上的实现,即用Java语言实现的一套基于观察者模式的异步编程接口。
1.1 RxJava简述
RxJava是一个在Java虚拟机上,使用可观察的序列构成基于事件的,异步的程序库。
1.2 RxAndroid简述
RxAndroid是基于RxJava开发出的一套适用于Android开发的辅助库。
1.3 设计模式...
http://reactivex.io/RxJava/3.x/javadoc/
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-3.0
implementation 'io.reactivex.rxjava3:rxjava:3.x.y'
implementation 'io.reactivex.rxjava3:rxandroid:3.x.y'
Java 8(来源于官方文档)
				
Buffer 定期收集 Observable 的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。 如果原来的 Observable 发射了一个 onError 通知,buffer 会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始 Observable 发射的数据。 buffer(count) 每次取count项发射,最后一次发射可能不足count。 Observable.range(1, 10) .buffer(3) .subscr
RxJava另一块比较复杂的部分是背压,其中的难点不在于本身,而是背压的表现可能和我们随意畅想的情景不一样,以至于种种的不可理解。先看一个一个基本用法 Observable换成Flowable Observer换成Subscriber ObservableOnSubscribe换成FlowableOnSubscribe ObservableEmitter换成FlowableEmitter 增加一个参数表示背压策略 MISSING ERROR BUFFER LATEST 1、subscribe() Observable.create(new ObservableOnSubscribe<Integer>() { //创建事件发射器 @Override public void subscribe(ObservableEmitter<Integer> em...
1.1 用操作符组合Observable 对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。 ReactiveX真正强大的地方在于它的操作符操作符让你可以变换、组合、操纵和处理Observable发射的数据。 Rx操作符让你可以用声明式的风格组合异步操作序列,它拥有...
参考:[Android开发] RxJava2之路七 - 错误处理操作符例子Demo retry操作符 重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截 它有五种参数方法: retry(): 让被观察者重新发射数据,要是一直错误就一直发送了 retry(BiPredicate): interger是第几次重新发送,Throwable是错...
* 即,当发生的异常 = 网络异常 = IO异常 才选择重试 需求2:实现重试 * 通过返回的Observable发送的事件 = Next事件,从而使得retryWhen()重订阅,最终实现重试功能 需求3:延迟1段时间再重试 * 采用delay操作符 = 延迟一段时间发送,以实... 三、RxJava flatMap操作符用法详解 四、RxJava concatMap操作符用法详解 五、RxJava onErrorResumeNext操作符实现app与服务器间token机制 六、RxJava retryWhen操作符...