遇到错误时,发送1个特殊事件 & 正常终止
可捕获在它之前发生的异常
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Throwable("发生错误了"));
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e(TAG, "在onErrorReturn处理了错误: " + throwable.toString());
return 666;
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
onErrorResumeNext()
遇到错误时,发送1个新的Observable
onErrorResumeNext()拦截的错误 = Throwable;若需拦截Exception请用onExceptionResumeNext()
若onErrorResumeNext()拦截的错误 = Exception,则会将错误传递给观察者的onError方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Throwable("发生错误了"));
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );
return Observable.just(110,119);
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
onExceptionResumeNext()
遇到错误时,发送1个新的Observable
onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
若onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
retry()
重试,即当出现错误时,让被观察者(Observable)重新发射数据
接收到 onError()时,重新订阅 & 发送事件
Throwable 和 Exception都可拦截
共有5种重载方法
<– 1. retry() –>
// 注:若一直错误,则一直重新发送
<– 2. retry(long time) –>
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
<– 3. retry(Predicate predicate) –>
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
<– 4. retry(new BiPredicate
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.retry()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.retry(3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return false;
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
Log.e(TAG, "异常错误 = "+throwable.toString());
Log.e(TAG, "当前重试次数 = "+integer);
return false;
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
.retry(3,new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return false;
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
retryUntil()
出现错误后,判断是否需要重新发送数据
若需要重新发送 & 持续遇到错误,则持续重试
作用类似于retry(Predicate predicate)
具体使用
具体使用类似于retry(Predicate predicate),唯一区别:返回 true 则不重新发送数据事件。
repeatWhen()
作用 有条件地、重复发送 被观察者事件
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者
(Observable)& 发送事件
将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable
若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
作者:Carson_Ho
链接:https://www.jianshu.com/p/b0c3669affdb
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
call.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) {
if (i > 3) {
Observable.error(new Throwable("轮询结束"));
return Observable.just(1).delay(2000, TimeUnit.SECONDS);
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Translation translation) {
String result = translation.show();
tvContent.setText(result);
Log.e(TAG, "onNext: 第" + i + "次轮询");
@Override
public void onError(Throwable e) {
Log.d(TAG, e.toString());
@Override
public void onComplete() {
retryWhen()
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件.
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return Observable.error(new Throwable("retryWhen终止啦"));