相关文章推荐
飘逸的作业本  ·  mysql datetime ...·  3 周前    · 

遇到错误时,发送1个特殊事件 & 正常终止
可捕获在它之前发生的异常

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Throwable("发生错误了"));
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        Log.e(TAG, "在onErrorReturn处理了错误: " + throwable.toString());
                        return 666;
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");

onErrorResumeNext()

遇到错误时,发送1个新的Observable
onErrorResumeNext()拦截的错误 = Throwable;若需拦截Exception请用onExceptionResumeNext()
若onErrorResumeNext()拦截的错误 = Exception,则会将错误传递给观察者的onError方法

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Throwable("发生错误了"));
               .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                   @Override
                   public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                       // 1. 捕捉错误异常
                       Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );
                       // 2. 发生错误事件后,发送一个新的被观察者 & 发送事件序列
                       return Observable.just(110,119);
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");

onExceptionResumeNext()

遇到错误时,发送1个新的Observable
onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
若onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                .onExceptionResumeNext(new Observable<Integer>() {
                    @Override
                    protected void subscribeActual(Observer<? super Integer> observer) {
                        observer.onNext(11);
                        observer.onNext(22);
                        observer.onComplete();
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");

retry()

重试,即当出现错误时,让被观察者(Observable)重新发射数据
接收到 onError()时,重新订阅 & 发送事件
Throwable 和 Exception都可拦截
共有5种重载方法
<– 1. retry() –>
// 注:若一直错误,则一直重新发送
<– 2. retry(long time) –>
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
<– 3. retry(Predicate predicate) –>
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
<– 4. retry(new BiPredicate

//1. retry()
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                .retry()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
//2. retry(long time)
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                .retry(3)// 设置重试次数 = 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
//3. retry(Predicate predicate)
        // 拦截错误后,判断是否需要重新发送请求
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        // 捕获异常 Log.e(TAG, "retry错误: "+throwable.toString());
                        // 返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        // 返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return false;
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
//4. retry(new BiPredicate<Integer, Throwable>)
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                // 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
                // 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(Integer integer, Throwable throwable) throws Exception {
                        // 捕获异常
                         Log.e(TAG, "异常错误 = "+throwable.toString());
                        // 获取当前重试次数
                         Log.e(TAG, "当前重试次数 = "+integer);
                        // 返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        // 返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return false;
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
//5. retry(long time,Predicate predicate) 
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new Exception("发生错误了"));
                // 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
                // 参数 = 设置重试次数 & 判断逻辑
                .retry(3,new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        // 捕获异常 Log.e(TAG, "retry错误: "+throwable.toString());
                        // 返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        // 返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return false;
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");

retryUntil()

出现错误后,判断是否需要重新发送数据
若需要重新发送 & 持续遇到错误,则持续重试
作用类似于retry(Predicate predicate)
具体使用
具体使用类似于retry(Predicate predicate),唯一区别:返回 true 则不重新发送数据事件。

repeatWhen()

作用 有条件地、重复发送 被观察者事件
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者
(Observable)& 发送事件
将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable
若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable

作者:Carson_Ho
链接:https://www.jianshu.com/p/b0c3669affdb
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

 call.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            // 在Function函数中,必须对输入的 Observable<Object>进行处理,此处使用flatMap操作符接收上游的数据
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) {
                // 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
                // 以此决定是否重新订阅 & 发送原来的 Observable,即轮询 // 此处有2种情况:
                // 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
                // 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Object o) {
                        // 加入判断条件:当轮询次数 = 5次后,就停止轮询
                        if (i > 3) {
                            // 此处选择发送onError事件以结束轮询,因为可触发下游观察者的onError()方法回调
                            Observable.error(new Throwable("轮询结束"));
                        // 若轮询次数<4次,则发送1Next事件以继续轮询
                        // 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s)
                        return Observable.just(1).delay(2000, TimeUnit.SECONDS);
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    @Override
                    public void onNext(Translation translation) {
                        // e.接收服务器返回的数据
                        String result = translation.show();
                        tvContent.setText(result);
                        Log.e(TAG, "onNext: 第" + i + "次轮询");
                    @Override
                    public void onError(Throwable e) {
                        // 获取轮询结束信息
                        Log.d(TAG, e.toString());
                    @Override
                    public void onComplete() {

retryWhen()

遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件.

.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
            // 返回Observable<?> = 新的被观察者 Observable(任意类型)
            // 此处有两种情况:
            // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:
            // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
                // 该异常错误信息可在观察者中的onError()中获得
                return Observable.error(new Throwable("retryWhen终止啦"));
                // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
                // return Observable.just(1);