前言:不要在该奋斗的年纪去选择安逸享乐,不是每个人都能成为自己想要的样子,但是每个人都能努力成为自己想要的样子,相信自己,你能作茧自缚,最终也能破茧成蝶。                                                              —— 致正在努力奋斗打拼的人

RxJava如此受欢迎,是因为提供了丰富且功能强大的操作符,几乎能完成所有的功能需求。下面我们会继续学习其他操作符。RxJava2在第一篇文章作了详细的介绍,是一个基于事件流的异步操作库。第二篇文章中讲解了RxJava的变换操作符,组合操作符,合并操作符 (链接和源码在文章最后给出) ;如果不了解Rxjava2可以参考下 RxJava2最全面、最详细的讲解(一) ,如果英文比较好而且有兴趣的可以到官网学习: ReactiveX的官方网站 。下面开始继续讲解RxJava2的其他用法。

二、延迟操作符

1.delay()

延迟一定时间 Observable (被观察者)再发送事件, delay() 有个多个重载的方法:

delay()的构造方法:

delay(long delay, TimeUnit unit)
delay(long delay, TimeUnit unit, boolean delayError)
delay(long delay, TimeUnit unit, Scheduler scheduler)
delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)

  • delay: 延迟的时间,Long类型;
  • unit: 事件单位,TimeUnit.SECONDS等类型;
  • delayError: 是否开启错误延迟,boolean类型;
  • scheduler: 调度器,用于切换线程。

如果存在错误或异常事件,则正常执行,执行后再抛出异常, error 事件后的其他事件不会再被执行。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) {
            e.onNext(1);
            e.onNext(2);
            e.onError(new Exception("抛出异常"));
            e.onNext(3);
            e.onComplete();
            //延时2秒,时间单位为秒,开启错误回调延时
            .delay(2, TimeUnit.SECONDS, true)
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "delay:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "delay:onNext == " + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "delay:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "delay:onComplete == ");

上面例子中,发送1,2,抛出异常,3共四个事件,设置2秒延时,同时开启delayError错误延时,打印数据如下:

三、do操作符

1.doOnNext()与doOnEach()与doAfterNext()

  • doOnNext():     每个事件都会执行,执行next()事件前执行;
  • doOnEach():    每个事件都会执行,当Observable每次发送一个数据事件都会调用(onNext()前回调);
  • doAfterNext():  每个事件都会执行,执行next()事件后执行。
 Observable.just(1, 2, 3)
         //执行next()事件前执行
         .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "doOnNext 事件前执行== " + integer);
            //当Observable每次发送一个数据事件都会调用(onNext()前回调)
            .doOnEach(new Consumer<Notification<Integer>>() {
        @Override
        public void accept(Notification<Integer> integerNotification) {
            Log.e(TAG, "doOnEach 每次都执行== " + integerNotification.getValue());
            //执行next()事件后执行
            .doAfterNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            Log.e(TAG, "doAfterNext 事件后执行== " + integer);
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "doNext:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "doNext:onNext == " + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "doNext:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "doNext:onComplete == ");

这里发送了1,2,3共三个事件,分别测试了doOnNext()doOnEach()doAfterNext(),打印log如下:

2.doOnError()与doAfterTerminate()

  • doOnError():           Observable发送错误事件时调用;
  • doAfterTerminate():Observable发送事件完毕后,无论是正常发送还是异常终止都会执行。
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) {
            e.onNext(1);
            e.onError(new Exception("抛出异常"));
            e.onNext(2);
            e.onComplete();
            //Observable发送错误事件时调用
            .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.e(TAG, "doOnError 发送错误事件== " + throwable.getMessage());
            //Observable发送事件完毕后,无论是正常发送还是异常终止都会执行
            .doAfterTerminate(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG, "doAfterTerminate == 事件结束");
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "doOnError:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "doOnError:onNext == " + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "doOnError:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "doOnError:onComplete == ");

上面例子中,发送事件中途抛出异常,调用doOnError()doAfterTerminate()方法,打印数据如下:

3.doOnSubscribe()与doOnComplete()与doFinally()

  • doOnSubscribe():观察者订阅时调用;
  • doOnComplete():Observable正常发送事件完成后调用;
  • doFinally():          最后执行。
Observable.just(1, 2, 3)
             //观察者订阅时调用
             .doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) {
            Log.e(TAG, "doOnSubscribe == 订阅时执行");
            //Observable正常发送事件完成后调用
            .doOnComplete(new Action() {
        @Override
        public void run() {
            Log.e(TAG, "doOnComplete == 事件完成执行");
            //最后执行
            .doFinally(new Action() {
        @Override
        public void run() {
            Log.e(TAG, "doFinally == 最后执行");
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concat:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "concat:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concat:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "concat:onComplete == ");

上面例子中,发送了1,2,3共三个事件,分别调用doOnSubscribe()doOnComplete()doFinally()方法,打印log如下:

关于do相关的操作符总结一下:

创建类型作用使用场景
do相关操作符doOnNext()执行onNext()事件前调用

在事件生命周期伴随事件

回调执行其他的相关逻辑

doOnEach()当Observable每发送一次数据就调用一次
doAfterNext()执行onNext()事件后调用
doOnError()发送错误事件时调用
doAfterTerminate()无论是正常发送事件完毕/异常终止都回调
doOnSubscribe()观察者订阅时回调
doOnComplete()正常发送事件完毕后
doFinally()最后执行

四、错误处理操作符

1.onErrorReturn()

在被观察者Observable发生错误或者异常的时候,拦截错误并执行指定逻辑,返回一个与源Observable相同类型的结果(onNext()),最后回调onComplete()方法。简单来说,Observable发生错误或异常时发送一个相同类型的特殊事件,并且终止事件发送。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            //发送1~10共10个事件
            for (int i = 0; i < 10; i++) {
                if (i == 5) {//i==5抛出异常
                    e.onError(new Exception("抛出异常"));
                e.onNext(i);
            //在Observable发生错误或者异常的时候,拦截错误并执行指定逻辑,
            //返回一个与源Observable相同类型的结果(onNext()),最后回调onConplete()方法。
            .onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            return 404;
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concat:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "concat:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concat:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "concat:onComplete == ");

上面例子中,发送1~10共10个事件,当发送第五个事件时,抛出异常,onErrorReturn()回调的方法就会执行发送同类型的数据onNext(404)给订阅者,订阅者Observer接收404并且回调onComplete()方法,打印log如下:

2.onErrorResumeNext()

在被观察者Observable发生错误或者异常的时候,拦截错误并执行指定逻辑,返回一个与源Observable相同类型的新的Observable被观察者,最后回调onComplete()方法。

onErrorResumeNext()onErrorReturn()类似,区别在于:

  • onErrorReturn():          返回的是一个与源Observable相同类型的结果(onNext())
  • onErrorResumeNext():返回的是与源Observable相同类型的新的Observable,也就是说可以返回多个相同类型的Observable。
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            //发送2个事件
            e.onNext("一");
            e.onNext("二");
            //抛出异常
            e.onError(new Exception("抛出异常"));
            //拦截错误并执行指定逻辑,返回Observable
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
        @Override
        public ObservableSource<? extends String> apply(Throwable throwable) {
            return Observable.just("404", "405", "406");
            .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onErrorResumeNext:onSubscribe == 订阅");
        @Override
        public void onNext(String s) {
            Log.e(TAG, "onErrorResumeNext:onNext ==" + s);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onErrorResumeNext:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "onErrorResumeNext:onComplete == ");

这里发送了“一”,“二”两个事件后,抛出异常,在onErrorResumeNext()回调方法中返回3个相同类型的Observable:“404”,“405”,“406”,打印结果如下:

3.onExceptionResumeNext()

在被观察者Observable发生异常的时候,返回一个与源Observable相同类型的新的Observable被观察者,拦截异常并执行指定逻辑,终止事件发送。

onErrorResumeNext()onErrorReturn()类似,区别在于:

  • onErrorResumeNext():          拦截的是错误Error和异常Exception,回调onConplete()
  • onExceptionResumeNext():  只能拦截异常Exception,不回调onConplete()

注意:这里仅仅是发生异常Exception的时候回调,发送错误Error并不会回调。

这里普及一个知识,如果熟悉的可以跳过,Java的异常分为错误(Error)和异常(Exception)两种,他们都继承自Throwable

错误(Error):一般是比较严重的系统问题,比如我们经常遇到的OutOfMemoryError、StackOverflowError等错误。错误一般继承与Error,Error继承与Throwable,如果需要捕获错误则需要使用try…catch(Error e)或者try…catch(Throwable t),使用try…catch(Exception e)句式无法捕获异常。

异常(Exception):一般是根据实际处理业务抛出的异常,分为运行时异常(RuntimeException)和普通异常,普通异常直接继承Exception类,如果内部方法没有try…catch(Exception e)句式,就必须通过throws关键字将异常抛出外部处理(即checked异常),而运行时异常继承RuntimeException类,如果内部没有设置try…catch(Exception e)处理,不需要显示通过throws关键字抛出外部,如NullPointerException、IndexOutOfBoundsException等运行时异常,当然RuntimeException继承自Exception,异常(Exception)继承自Throwable,可以通过try…catch(Exception e)捕获异常。

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            //发送2个事件
            e.onNext("事件1");
            e.onNext("事件2");
            //抛出异常
            e.onError(new Exception("抛出异常"));
            //拦截异常并执行指定逻辑,返回多个Observable执行的事件
            .onExceptionResumeNext(new ObservableSource<String>() {
        @Override
        public void subscribe(Observer<? super String> observer) {
            observer.onNext("404");
            observer.onNext("405");
            observer.onNext("406");
            .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onExceptionResumeNext:onSubscribe == 订阅");
        @Override
        public void onNext(String s) {
            Log.e(TAG, "onExceptionResumeNext:onNext ==" + s);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onExceptionResumeNext:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "onExceptionResumeNext:onComplete == ");

这里发送了"事件1","事件2"后抛出异常,执行onExceptionResumeNext()回调方法,观察者Observer重新发送“404”,“405”,“406”三个事件的结果,打印log如下:

4.retry()

当被观察者Observable发生错误或者异常时,重新尝试执行Observable的逻辑,如果经过n次重试后仍然出现错误或者异常,最后回调onError()方法,如果没有错误或者异常,则按照正常的流程走。简单来说就是出现错误或者异常的时候,让被观察者重新发送数据,回调onError方法。

注意:如果没有指定重试次数,则retry()会一直无限重试。

retry()的相关构造方法:
//无限次重试
retry();
//指定重试次数
retry(long times);
//无限重试,返回当前重试的次数和异常信息
retry(BiPredicate<? super Integer, ? super Throwable> predicate);
//指定重试次数,返回异常信息
retry(long times, Predicate<? super Throwable> predicate);
//无限重试,返回异常信息
retry(Predicate<? super Throwable> predicate);

相关参数解析:

  • times:                                                                                     设置的重试次数,Long类型;
  • Predicate<? super Throwable> predicate:                             存放的错误或者异常信息;
  • BiPredicate<? super Integer, ? super Throwable> predicate:错误异常信息,Integer表示当前重试的次数为第几次。
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) {
            e.onNext(1);
            e.onNext(2);
            e.onError(new Exception("抛出异常"));
            //重新发送数据,这里重发2次,带有异常信息回调
            .retry(2, new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) {
            //false: 不重新发送数据,回调Observer的onError()方法结束
            //true: 重新发送请求(最多发送2次)
            return true;
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "retry:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "retry:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "retry:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "retry:onComplete == ");

这里发送事件1,2后“抛出异常”事件,retry()重写方法中返回的boolean值表示是否发送重试请求,那么这里会重试2次后回调onError()方法,打印数据如下:

5.retryWhen()

在源被观察者Observable出现错误或者异常时,通过回调新的Observable来判断是否重新尝试执行源Observable的逻辑, 如果第二个Observable没有出现错误或者异常,就会重新尝试执行源Observable的逻辑,否则会直接回调订阅者的onError()方法。

retryWhen()retry()类似,简单来说,遇到错误或异常时,将错误发送给新的Observable被观察者,并决定是否需要原来的Observable重新订阅并且发送事件。

 Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) {
            e.onNext(66);
            e.onNext(88);
            e.onError(new Exception("抛出异常"));
            //发生错误或者异常时,重试,将原来Observable的错误或者异常事件,转换成新的Observable
            //如果新的Observable返回了onError()事件,则不再重新发送原来Observable的数据
            //如果新的Observable返回onNext事件,则会重新发送原来Observable的数据
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable){
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    //新观察者Observable发送onNext()事件,则源Observable重新发送数据。如果持续遇到错误则持续重试。
                    //return Observable.just(1);
                    //回调onError()事件,并且接收传过去的错误信息
                    return Observable.error(new Exception("抛出异常2"));
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "retryWhen:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "retryWhen:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "retryWhen:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "retryWhen:onComplete == ");

(1)新的被观察者Observable发送onError()事件,打印log如下:

(2)新的被观察者Observable发送其他事件,发送的数据不重要,只是新的Observable通知原来的Observable重新发数据,打印log如下:

6.repeat()

repeat()表示重复发送被观察者事件。可设置重复的次数,如果不设置参数则会无限次重复发送。注意:如果中途抛出错误或者异常,则无法重复发送。

Observable.just(1,2)
        //重复2次
        .repeat(2)
               .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "repeat:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "repeat:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "repeat:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "repeat:onComplete == ");

这里重复发送事件队列1,2,重复两次,打印log如下:

6.repeatWhen()

repeatWhen()是指有条件,重复性发送原来的被观察者Observable事件;在回调方法中创建新的Observable,通过新的observable是否重新订阅和发送事件。

注意:如果新的Observable被观察者返回onComplete()onError()事件,则不重新订阅、发送源Observable的事件;如果新的Observable被观察者返回其他事件,则重新订阅、发送源Observable的事件。

Observable.just(1004,1005)
            .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
        //如果新的Observable被观察者返回onComplete()或onError()事件,则不重新订阅、发送源Observable的事件
        //如果新的Observable被观察者返回其他事件,则重新订阅、发送源Observable的事件
        @Override
        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
            //flatMap用于接收上面的数据
            return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Object o) throws Exception {
                    //通知原来的Observable,重新订阅和发送事件(发送什么数据不重要,这里仅做通知使用)
                    //return Observable.just(1);
                    //等于发送onComplete()方法,但是不会回调Observer的onComplete()
                    //return Observable.empty();
                    //回调onError()事件,并且接收传过去的错误信息
                    return Observable.error(new Exception("抛出异常"));
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "repeatWhen:onSubscribe == 订阅");
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "repeatWhen:onNext ==" + integer);
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "repeatWhen:onError == " + e.getMessage());
        @Override
        public void onComplete() {
            Log.e(TAG, "repeatWhen:onComplete == ");

发送其他事件Observable.just(1),会重复通知原来的Observable被观察者重新订阅和发送是事件,打印数据如下:

发送Observable.empty(),不会回调onComplete()方法;

发送Observable.error(new Exception("抛出异常")),回调onError()事件,并且接收传过去的错误信息,打印log如下:

错误处理操作符总结一下:

创建类型作用使用场景
错误处理操作符onErrorReturn()Observable发生错误或异常时发送一个相同类型的特殊事件,并且终止事件发送。

遇到错误异常时的解决机制

onErrorResumeNext()Observable发生错误或异常时产生新的Observable发送一个特殊事件,并且终止事件发送。
onExceptionResumeNext()同上,不同点:这里只能拦截异常不能拦截错误
retry()发生异常或错误时重试源Observable的请求
retryWhen()同上,不同点:将错误异常传递给新的Observable通知源Observable
repeat()重复源Observable发送事件(抛出异常错误不能重复发送)
repeatWhen()在回调方法中创建新的Observable,通过新的observable是否重复订阅和发送事件。

至此,本文结束!其他相关用法请留意下一篇文章!

源码地址:https://github.com/FollowExcellence/Rxjava_Retrofit

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。

我是suming,感谢各位的支持和认可,您的点赞、评论、收藏【一键三连】就是我创作的最大动力,我们下篇文章见!

如果本篇博客有任何错误,请批评指教,不胜感激 !

要想成为一个优秀的安卓开发者,这里有必须要掌握的知识架构,一步一步朝着自己的梦想前进!Keep Moving!

相关文章:

Retrofit2详解和使用(一)

  • Retrofit2的介绍和简单使用

OKHttp3的使用和详解

  • OKHttp3的用法介绍和解析

OKHttp3源码详解

  • 从源码角度解释OKHttp3的关键流程和重要操作

RxJava2详解(一)

  • 详细介绍了RxJava的使用(基本创建、快速创建、延迟创建等操作符)

RxJava2详解(二)

  • RxJava转换、组合、合并等操作符的使用

RxJava2详解(三)

  • RxJava延迟、do相关、错误处理等操作符的使用

RxJava2详解(四)

  • RxJava过滤、其他操作符的使用

上述几篇都是android开发必须掌握的,后续会完善其他部分!

前言:不要在该奋斗的年纪去选择安逸享乐,不是每个人都能成为自己想要的样子,但是每个人都能努力成为自己想要的样子,相信自己,你能作茧自缚,最终也能破茧成蝶。 —— 致正在努力奋斗打拼的人一、概述RxJava如此受欢迎,是因为提供了丰富且功能强大的操作符,几乎能完成所有...
RxJava是一套响应式编程技术,实现业务和逻辑上的链式编程。RxJavaRxJava2的使用会有一些区别,这篇博客基于RxJava2。我将总结一下RxJava2的使用,整理一下RxJava2常用的操作符,针对每个操作符,都会有相应的代码,帮助我们理解。 一.RxJava概论 RxJava也是基于观察者模式:被观察者完成计算任务,观察者根据被观察者的计算结果作...
版权声明:未经玉刚说许可,不得以任何形式转载 0. 简介 RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。 RxJava 有以下个基本的元素: 被观察者(Observable) 观察者(Observer... RxJava2在第一篇文章中基本用法作了详细的介绍,是一个基于事件流的异步操作库。相信大家对RxJava有了一定的理解,由于篇幅过长所以重新写了一篇,如果不了解Rxjava2可以参考下RxJava2最全面、最详细讲解(一)。下面开始继续讲解RxJava2的其他用法。(源码和其他链接在文章最后给出) 在使用...
RxJava中,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable),通过创建可观察对象发射数据流,经过一系列操作符(Operators)加工处理和线程调度器(Scheduler)在不同线程间的转发,最后由观察者接受并做出响应的一个过程 ObservableSource与Observer是RxJava2中最典型的一组观察者与可观察对象的组合,其他四组...
前言在上一篇博客中,提到了RxJava的一些比较核心的东西,还有与1.x版本的一些区别! 现在我们具体了解一下它的使用使用最基本的的使用我们知道一个简单的RxJava的应用,需要一个观察者或者订阅者Observer,一个被观察者Observable,最后调用subscribe()方法将两者绑定起来! 示例://创建观察者或者订阅者 Observer observer = new
提高开发效果,降低维护成本一直是团队追求的宗旨。现在安卓里面RxJava2+Retrofit+OKHttp的网络请求框架是最流行的,几乎一统江湖,配合响应式式编程的RxJava使用也越来越多。如果英文比较好而且有兴趣的可以到官网学习:ReactiveX的官方网站。(源码和其他相关链接在文章最后给出) RxJava到底是什么?       RxJava在Git...
RxJava2是一个流式编程库,适用于异步和事件驱动程序的开发。以下是RxJava2的一些应用场景: 1. 网络请求和数据处理:RxJava2可以很方便地处理网络请求和响应,同时还能够对返回的数据进行各种处理和转换,例如过滤、转换、聚合、缓存等操作。 2. 响应式UI:在Android开发中,RxJava2可以很方便地实现响应式UI,例如对于EditText的监听,ListView和RecyclerView的滚动事件监听等。 3. 异步操作:RxJava2可以让开发人员更加方便地进行异步操作,例如多个异步任务的组合、并行执行等。 4. 事件总线:RxJava2可以作为一个轻量级的事件总线,方便地实现模块之间的通信。 总之,RxJava2适用于需要处理异步和事件驱动程序的场景,可以提高程序的可读性和可维护性,减少回调地狱等问题。