RxJava2 实战系列文章

RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用

1.1 应用背景

在网络请求时,有时候会出现需要进行重试的情况,重试的时候,有以下几点需要注意:

  • 限制重试的次数
  • 根据错误类型,判断是否要重试
  • 根据错误类型,等待特定的时间之后再去重试

我们先来看一下目前的一些网络框架是怎么做的?通过分析 Volley 的源码,可以从 BasicNetwork 的代码中看到,它是将网络请求的代码都放在一个无限的 while(true) 循环当中,如果发生了异常,会在其中的 catch 语句中进行处理,如果需要继续重试,那么就吞掉这个异常,并将重试次数加 1 ,这样就会进入下一次的 while(true) 循环去访问网络;如果不需要重试,那么就抛出这个异常,退出这个无限循环。也就是实现了前面两点需求。

下面我们就来演示如何通过 RxJava2 来轻松实现上面的三点需求,通过这篇文章,我们将学习 retryWhen 操作符的具体用法, retryWhen repeatWhen 经常被大家用来比较,如果对 repeatWhen 感兴趣的同学可以阅读上一篇文章 RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作

2.2 示例代码

在下面的例子中,我们一共发起了五次请求,也就是 subscribe 中的代码,其中前四次请求都调用 onError 方法通知下游请求失败,同时带上了自定义的错误信息 wait_short wait_long ,第五次才返回正确的数据。

当我们收到错误之后,会根据错误的类型确定重试的时间,同时,我们还保存了当前重试的次数,避免无限次的重试请求。如果需要重试,那么通过 Timer 操作符延时指定的时间,否则返回 Observable.error(Throwable) 放弃重试。

public class RetryActivity extends AppCompatActivity {
    private static final String TAG = RetryActivity.class.getSimpleName();
    private static final String MSG_WAIT_SHORT = "wait_short";
    private static final String MSG_WAIT_LONG = "wait_long";
    private static final String[] MSG_ARRAY = new String[] {
            MSG_WAIT_SHORT,
            MSG_WAIT_SHORT,
            MSG_WAIT_LONG,
            MSG_WAIT_LONG
    private TextView mTvRetryWhen;
    private CompositeDisposable mCompositeDisposable;
    private int mMsgIndex;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_retry);
        mTvRetryWhen = (TextView) findViewById(R.id.tv_retry_when);
        mTvRetryWhen.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                startRetryRequest();
        mCompositeDisposable = new CompositeDisposable();
    private void startRetryRequest() {
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                int msgLen = MSG_ARRAY.length;
                doWork();
                //模拟请求的结果,前四次都返回失败,并将失败信息递交给retryWhen。
                if (mMsgIndex < msgLen) { //模拟请求失败的情况。
                    e.onError(new Throwable(MSG_ARRAY[mMsgIndex]));
                    mMsgIndex++;
                } else { //模拟请求成功的情况。
                    e.onNext("Work Success");
                    e.onComplete();
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            private int mRetryCount;
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        String errorMsg = throwable.getMessage();
                        long waitTime = 0;
                        switch (errorMsg) {
                            case MSG_WAIT_SHORT:
                                waitTime = 2000;
                                break;
                            case MSG_WAIT_LONG:
                                waitTime = 4000;
                                break;
                            default:
                                break;
                        Log.d(TAG, "发生错误,尝试等待时间=" + waitTime + ",当前重试次数=" + mRetryCount);
                        mRetryCount++;
                        return waitTime > 0 && mRetryCount <= 4 ? Observable.timer(waitTime, TimeUnit.MILLISECONDS) : Observable.error(throwable);
        DisposableObserver<String> disposableObserver = new DisposableObserver<String>() {
            @Override
            public void onNext(String value) {
                Log.d(TAG, "DisposableObserver onNext=" + value);
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "DisposableObserver onError=" + e);
            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete");
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();

上述代码的运行结果为,红框中的间隔就是每次等待重试的时间:

二、示例解析

2.1 retryWhen 介绍

retryWhen的原理图如下所示:

retryWhen 原理图


retryWhen提供了 重订阅 的功能,对于retryWhen来说,它的重订阅触发有两点要素:

  • 上游通知retryWhen本次订阅流已经完成,询问其是否需要重订阅,该询问是以onError事件触发的。
  • retryWhen根据onError的类型,决定是否需要重订阅,它通过返回一个ObservableSource<?>来通知,如果该ObservableSource返回onComplete/onError,那么不会触发重订阅;如果发送onNext,那么会触发重订阅。

实现retryWhen的关键在于如何定义它的Function参数:

  • Function的输入是一个Observable<Throwable>,输出是一个泛型ObservableSource<?>。如果我们接收Observable<Throwable>发送的消息,那么就可以得到上游发送的错误类型,并根据该类型进行响应的处理。
  • 如果输出的Observable发送了onComplete或者onError则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。也就是说,它 仅仅是作为一个是否要触发重订阅的通知onNext发送的是什么数据并不重要。
  • 对于每一次订阅的数据流 Function 函数只会回调一次,并且是在onError(Throwable throwable)的时候触发,它不会收到任何的onNext事件。
  • Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据,对于flatMap的解释,大家可以参考 RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯

2.2 retryWhen 和 repeatWhen 对比

RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作 中我们已经对repeatWhen进行了介绍,让我们再来看一下它的原理图:

repeatWhen 原理图


可以看到,retryWhenrepeatWhen最大的不同就是:retryWhen是收到onError后触发是否要重订阅的询问,而repeatWhen是通过onComplete触发。

2.3 根据 Throwable 的类型选择响应的重试策略

由于上游可以通过onError(Throwable throwable)中的异常通知retryWhen,那么我们就可以根据异常的类型来决定重试的策略。

就像我们在上面例子中做的那样,我们通过flatMap操作符获取到异常的类型,然后根据异常的类型选择动态地决定延迟重试的时间,再用Timer操作符实现延迟重试;当然,对于一些异常,我们可以直接选择不重试,即直接返回Observable.empty或者Observable.error(Throwable throwable)

更多文章,欢迎访问我的 Android 知识梳理系列:



作者:泽毛
链接:https://www.jianshu.com/p/d135f19e045c
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

这里使用平时开发过程中经常遇到的一个场景作为例子讲解,网络请求失败重试,当在业务层面,比如登录超或者token失效,需要重新获取token才能发起当前请求时,我们就可以使用retry完成该需求,这里其实涉及请求的嵌套。 首先别忘了添加rxjava相关依赖,这里使用的是Rxjava1.0: dependencies { implementation 'io.reactiv... Observable.just(1,2,3,4) .doOnNext { println("parade订阅observeOn${Thread.currentThread().name}") //当subscribeOn在Schedulers.io()时 打印parade订阅observeOnRxCachedThreadScheduler-1 //当subscribeOn在Androi..
Rxjava+ReTrofit+okHttp背景:    学习Rxjava+Retrofit+okhttp已经一段时间了,发现确实很强大,但是使用起来稍微有点麻烦,在工作中重复的代码太多,所以决定对http请求基于retrofit封装,最终效果还是比较满意,10行代码搞定预处理+加载框+错误处理+结果处理+gson转换回调!效果:优化完activity中的代码:// 完美封装简化版