相关文章推荐
坏坏的眼镜  ·  【Unity】2D ...·  2 周前    · 
温暖的西瓜  ·  许可信息公开内容·  1 年前    · 
拉风的剪刀  ·  陈浩民济公_抖抖音·  1 年前    · 
聪明的手电筒  ·  新凤霞·  1 年前    · 

Android进阶系列之第三方库知识点整理。

知识点总结,整理也是学习的过程,如有错误,欢迎批评指出。

第一篇: Rxjava2(一)、基础概念及使用
第二篇: Rxjava2(二)、五种观察者模式及背压

终于到操作符了,我觉得 rxjava2 如此好用,绝对少不了操作符的功劳,下面这张图你就简单的扫一眼,别慌,我们慢慢啃。

上一篇讲了, rxjava 有五种观察者创建模式,其中 Observable Flowable 差不多,只是 Flowable 支持背压,而其它三种,都是简化版的 Observable ,所以,本篇以 Observable 方式来讲操作符的使用。

Observable 源码

Observable 是一个抽象类,继承 ObservableSource

ObservableSource

一、创建操作符

这类操作符,创建直接返回 Observable

1.1、嵌套回调事件

1.1.1、create

create 是最常用的一个操作符,该操作符的参数中产生的 emitter 发射器,通过 onNext 不断给下游发送数据,也可以发送 onComplete onError 事件给下游。

需要发送给下游的数据,就通过emitter.onNext()给下游发送。

当发送了 onComplete 或者 onError 事件后,下游停止接收剩下的 onNext 事件

static <T> Observable<T> create(ObservableOnSubscribe<T> source)

demo:

  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                emitter.onNext("B");
                // .....
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

1.2、复杂数据遍历

这类操作符,直接将一个 数组 集合 拆分成单个ObJect数据依次发送给下游,也可以直接发送Object数据。

1.2.1、just

转换一个或多个 Object 数据,依次将这些数据发射到下游。

最多接收十个 Object 参数。

A : 最多只接收十个参数。

Demo:

        Observable.just("A""B""C""D")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: s=" + s);
                    }
                });
1.2.2、fromArray

直接传入一个数组数据,操作符将数组里面的元素按先后顺序依次发送给下游,可以发送十个以上的数据。

static <T> Observable<T> fromArray(T... items)

Demo:

String[] data = new String[]{"A""B""C""D""E""F""G""H""I""J""K"};
        Observable.fromArray(data)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "fromArray--accept: s=" + s
                        );
                    }
                });
1.2.3、fromIterable

直接传入一个集合数据,操作符将集合里面的元素按先后顺序依次发送给下游,可以发送十个以上的数据。

static <T> Observable<T> fromIterable(Iterable<? extends T> source)

Demo:

        List<String> mData = new ArrayList<>();
        mData.add("A");
        mData.add("B");
        mData.add("C");
        Observable.fromIterable(mData)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "fromIterable--accept: s=" + s);
                    }
                });
1.2.4、range

快速创建一个被观察者对象,连续发送一个指定开始和总数的事件序列

立即发送,无延时

static Observable<Integer> range(final int start, final int count)

Demo:

        // 从3开始发送,直到发送了十个数据停止。
        Observable.range(310).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "range--accept: integer=" + integer);
            }
        });

1.3、定时任务

1.3.1、interval

快速创建一个被观察者,延迟一定时间后再每隔指定的一个时间发送一个事件(从0开始的整数)给下游。

发送数据从0开始,依次+1整数递增

延迟时间可以为0,重载方法不设置默认使用第二个参数数值。

// initialDelay:发射第一个值需要等待时间
// period:后续每隔多少秒发射一个值
// unit:前两个参数的时间单位
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

// 两参方法
public static Observable<Long> interval(long period, TimeUnit unit) 
{
       // 第一个参数和第二个参数一致,即延迟period后再每隔period秒发送一个事件。
    // 默认使用 Schedulers.computation()
    return interval(period, period, unit, Schedulers.computation());
 }
// initialDelay:发射第一个值需要等待时间
// period:后续每隔多少秒发射一个值
// unit:前两个参数的时间单位
// scheduler:等待发生并发出项目的调度程序
static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

demo:

// 延迟2秒后发送一个事件,后续每隔五秒发送一个事件
Observable.interval(25, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "onNext: aLong=" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: error" + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
1.3.2、intervalRange

快速创建1个被观察者对象,每隔指定时间发送1个事件,可以指定事件发送开始的值和总的值。

// start:范围起始值
// count:要发出的值的总数,如果为零,则操作员在初始延迟后发出onComplete。
// initialDelay:发出第一个值(开始)之前的初始延迟
// period:后续值之间的时间段
// unit:前面时间参数单位
static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

demo:

// 第一个延迟三秒后发送int值2,后续每隔1秒累加发送给下游,一共发送10个数据。
Observable.intervalRange(21031, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "onNext: aLong=" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: error" + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

1.4、延迟任务

1.4.1、defer

创建一个 Observable 对象,被观察者逻辑真正执行的时机是在其被订阅的时候。

当下游订阅后,上游才开始处理逻辑。

// 
static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

demo:

String[] mStrings = new String[]{"A""B""C""D"};

Observable observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        // 上游发送mStrings数组
        return Observable.fromArray(mStrings);
    }
});
// 在订阅之前,将数组数据改变
mStrings = new String[]{"defer,订阅时候才创建"};
// 订阅
observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "accept: s=" + s);
    }
});
1.4.2、timer

创建一个被观察者对象,上游延时指定的时间后发送一个事件到下游。

发送的数值为Long型的0

// delay:发射单个数据之前的延时
// unit:前者时间单位
// scheduler:指定的调度程序 (默认为Schedulers.computation())
static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 

demo:

public void timer() {
        // 延迟5秒后发送Long型值0到下游,可指定Schedulers,默认Schedulers.computation()
        Observable.timer(5, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "accept: aLong=" + aLong);
                    }
                });
    }

二、变换操作符

2.1、变换

2.1.1、map

对上游发送的每一个事件都进行指定的函数处理,从而变换成另一个事件再发送给下游。

常使用场景:用作数据类型转换

// R:输出类型
// mapper:应用于ObservableSource发出的每个项目的函数
final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

demo:

    public void map() {
        // 通过just发送整型数值1、2、3
        Observable.just(123).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                // 通过Map操作符对上游的数据进行函数处理,再转换成指定的事件发送给下游
                return integer + "变换";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: s=" + s);
            }
        });
    }
2.1.2、flatMap

将一个发送事件的上游 Observable 变换为多个发送事件的 Observables ,然后将它们发射的事件单独做处理后再合并放进一个单独的 Observable 里发送给下游。

可以看到上游发送了三个事件(注意颜色),中间对每个事假数据进行处理后(每一个圆变成两个矩形),再合并成包含六个矩形事件的Observable对象发送给下游,注意矩形颜色,他是无规律,无序的,并不是严格按照上游发送的顺序来发送给下游。

final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

demo:

public void flatMap() {
    // 被观察者通过just发送整型数值1、2、3
   Observable.just(123).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            // 对收到的数值再进行函数处理。
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("变换后的数据" + integer);
            }
            // 将函数处理后的数据,在包装成一个Observable对象发送给下游。
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: s=" + s);
        }
    });
}
2.1.3、concatMap

flatMap 一样的功能,只是 flatMap 不能保证转换后发送给下游事件的时序,concatMap转换后能严格按照上游发送的顺序再发送给下游。

flatMap 一样,重点注意颜色,转换后颜色和上游发送的顺序一致,有序发送

final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)

demo:

public void concatMap() {
    // 被观察者通过just发送整型数值1、2、3
    Observable.just(123).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            // 对收到的数值再进行函数处理。
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("变换后的数据" + integer);
            }
            // 将函数处理后的数据,在包装成一个Observable对象发送给下游。
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: s=" + s);
        }
    });
}

三、合并操作符

3.1、concat

组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

组合的被观察者数量要求小于等于4个,从提供的方法参数里面可以得知。

public static <T> Observable<T> concat(
            ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
            ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

demo:

    public void concat() {
        // 用just操作符创建三个Observable对象
        Observable<String> observable1 = Observable.just("1""2");
        Observable<String> observable2 = Observable.just("A""B""C");
        Observable<String> observable3 = Observable.just("hello""rxjava");
        // 使用concat操作符合并三个Observable对象,并将合并后的数据顺序(串行)发送给下游
        Observable.concat(observable1
                , observable2, observable3)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: s=" + s);
                    }
                });
    }
3.2、concatArray

concat 一样,组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

concatArray对组合的被观察者对象没有个数限制,可以大于4个。

上游发送的是一个组合的观察者数组,没有数量限制(注意颜色)

转换后串行发送(颜色和上游发送顺序对应)

static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) 

demo:

public void concatArray() {
    Observable<String> observable1 = Observable.just("1""2");
    Observable<String> observable2 = Observable.just("A""B""C");
    Observable<String> observable3 = Observable.just("D""E");
    Observable<String> observable4 = Observable.just("F");
    Observable<String> observable5 = Observable.just("G");
    Observable.concatArray(observable1, observable2, observable3, observable4, observable5)
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept: s=" + s);
                }
            });
}
3.3、concatDelayError、concatArrayDelayError

使用concat操作符时,如果遇到其中一个被观察者发出 onError 事件则会马上终止其他被观察者的事件,如果希望 onError 事件推迟到其他被观察者都结束后才触发,可以使用对应的concatDelayError。

public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        return concatDelayError(fromIterable(sources));
    } 

public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
        return concatDelayError(sources, bufferSize(), true);
    }

public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd)

demo:

    public void concatArrayDelayError() {

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onError(new NullPointerException(""));
                emitter.onNext("D");
            }
        });


 Observable.concatArrayDelayError(observable, Observable.just("E""F"))
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: s="+s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: e" + e.getMessage(), e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

可以看到,第一个observable发送到c后,手动抛出一个错误,但是并灭有影响到Observable.just("E", "F")的执行,我们依旧打印出了 E,F两个参数后才去执行我们手动抛出的NullPointerException错误

操作符这部分内容比较多,先整理这部分,后面会对其他操作符再做整理。