转载请标明出处:http://blog.csdn.net/xx326664162/article/details/51967967 文章出自: 薛瑄的博客

你也可以查看我的其他同类文章,也会让你有一定的收货!

关于RxJava,从表面上看起来很容易使用,但是如果理解不够深刻,使用过程中,往往会出现一些问题,所以我写了系列文章,从入门到精通,从简单的使用到部分源码详解,希望能给读者一个质的飞跃:
1、 RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用它的好处
2、 RxJava之二——Single和Subject 与Observable举足轻重的类,虽然用的少,但应该知道
3、 RxJava之三——RxJava 2.0 全部操作符示例
4、 RxJava之四—— Lift()详解 想要了解Operators,Lift()一定要学习
5、 RxJava之五—— observeOn()与subscribeOn()的详解 Scheduler线程切换的原理
6、 RxJava之六——RxBus 通过RxJava来替换EventBus
7、 RxJava之七——RxJava 2.0 图文分析create()、 subscribe()、map()、observeOn()、subscribeOn()源码 这张图可能是全网最详细 明了的图

为什么多次调用subscribeOn()却只有第一个起作用?
为什么多次调用observeOn()却可以切换到不同线程
observeOn()后能不能再次调用subscribeOn()?

如果你有这些疑问,那接下来的内容必定能解决你心头的疑惑

subscribeOn()和observeOn()的区别

subscribeOn()和observeOn()都是用来切换线程用的

  • subscribeOn()改变调用它之前代码的线程
  • observeOn()改变调用它之后代码的线程

这里给出下面示例中用到的两个函数

//用指定的名称新建一个线程
public static Scheduler getNamedScheduler(final String name) {
        return Schedulers.from(Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(@android.support.annotation.NonNull Runnable runnable) {
                return new Thread(runnable, name);
        }));
//打印当前线程的名称
public static void threadInfo(String caller) {
        System.out.println(caller + " => " + Thread.currentThread().getName());

一、subscribeOn()

在讲解他的原理之前,先来一个简单的例子,有个感性认识,学起来更轻松

先说结论:subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制 。没看懂不要紧,看下面代码和图你就懂了。

Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                threadInfo("OnSubscribe.call()");
                subscriber.onNext("RxJava");
        .subscribeOn(getNamedScheduler("create之后的subscribeOn"))
        .doOnSubscribe(() -> threadInfo(".doOnSubscribe()-1"))
        .subscribeOn(getNamedScheduler("doOnSubscribe1之后的subscribeOn"))
        .doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2"))
        .subscribe(s -> {
            threadInfo(".onNext()");
            System.out.println(s + "-onNext");
        });

结果如下:

.doOnSubscribe()-2 => main
.doOnSubscribe()-1 => doOnSubscribe1之后的subscribeOn
OnSubscribe.call() => create之后的subscribeOn
.onNext() => create之后的subscribeOn
RxJava-onNext

3号框中的.doOnSubscribe(() -> threadInfo(".doOnSubscribe()-2")) 的之后由于没有subscribeOn操作符所以回调到该段代码被调用的线程(即主线程)

由于 subscribe 之前 没有 使用observeOn 指定Scheduler,所以.onNext()的线程是和OnSubscribe.call()使用相同的Scheduler 。

下面通过源码来分析一下:

1、示例代码:

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("a");
                        subscriber.onNext("b");
                        subscriber.onCompleted();
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                    @Override
                    public void onError(Throwable e) {
                    @Override
                    public void onNext(String integer) {
                        System.out.println(integer);
                });

运行如下:

2、subscribeOn()源代码

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        return create(new OperatorSubscribeOn<T>(this, scheduler));

很明显,会走if之外的方法。

在这里我们可以看到,又创建了一个OperatorSubscribeOn对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法

3、create()的源代码:

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));

我们看到这个方法,使用OperatorSubscribeOn这个类,来创建一个新的Observable,那就把它叫做Observable_2,把原来的Observable叫做Observable_1

4、OperatorSubscribeOn类的源代码:

public final class




    
 OperatorSubscribeOn<T> implements OnSubscribe<T> {
    final Scheduler scheduler;
    final Observable<T> source;
    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                    });
                        });
                source.unsafeSubscribe(s);
        });
  1. OperatorSubscribeOn类implements 了Onsubscribe接口,并实现call()方法
  2. OperatorSubscribeOn的构造方法,
    • 保存了Observable对象,就是调用了subscribeOn()方法的Observable对象
    • 并保存了Scheduler对象。

这里做个总结。

把Observable.create()创建的称之为Observable_1,OnSubscribe_1。
把subscribeOn()创建的称之为Observable_2,OnSubscribe_2

  • Observable_1是由示例代码的第1、2行创建的

  • OperatorSubscribeOn类是implements Onsubscribe接口的,所以可以当做Onsubscribe类使用。(OnSubscribe_2)

  • 并且OnSubscribe_2中保存了Observable_1的应用,即source。(在OperatorSubscribeOn源代码的第8行)

  • subscribeOn()源代码的倒数第二行,create(new OperatorSubscribeOn<T>(this, scheduler))返回新创建的Observable_2对象。

4.1、分析call()方法。

  • inner.schedule()改变了线程,此时Action的call()运行在指定的线程中。
  • 示例代码中的Subscriber包装了一层,赋给对象S(Subscriber_2)。见上面代码21行。
  • source.unsafeSubscribe(s);,
    • 注意:source是Observable_1对象,这里的s就是Subscriber_2
    • 因为调用过subscribeOn(Schedulers.io())后,返回Observable_2对象,所以示例代码第13行代码的subscribe()就是Observable_2.subscribe(),也就是执行OnSubscribe_2的call()方法(即OperatorSubscribeOn类的源代码的第12行)。

4.2 看一下source.unsafeSubscribe(s);(第65行)代码都做了什么

这里的source就是Observable_1,s是Subscriber_2

unsafeSubscribe()源代码:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            return Subscriptions.unsubscribed();

关键代码:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

该方法即调用了OnSubscribe_1.call()方法。

注意,此时的call()方法在我们指定的线程中运行。起到了改变线程的作用。

对于以上线程,我们可以总结,其有如下流程:

  • Observable.create() : 创建了Observable_1和OnSubscribe_1;

  • subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。

  • 示例代码中的subscribe(Observer) 实际上就是调用Observable_2.subscribe(Observer):

  • 调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);

  • Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。

这样就实现了在指定线程运行OnSubscribe的call()函数,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。

那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序

Observable.




    
just("ss") 
                .subscribeOn(Schedulers.io())   // ----1---
                .subscribeOn(Schedulers.newThread()) //----2----
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                });

那么,我们根据之前的源码分析其执行逻辑。

  • Observable.just(“ss”),创建Observable,OnSubscribe

  • Observable_1.subscribeOn(Schedulers.io()):创建Observable_1,OperatorSubscribeOn_1并保存Observable的引用。

  • Observable_2.subscribeOn(Schedulers.newThread()):创建Observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。

  • Observable_3.subscribe():

  • 调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.newThread()。

  • 调用OperatorSubscribeOn_1.call(),改变线程为Schedulers.io()。

  • 调用OnSubscribe.call(),此时call()运行在Schedulers.io()。

根据以上逻辑分析,会按照1的线程进行执行。

二、observeOn()

先说结论:observeOn作用于该操作符之后操作符直到出现新的observeOn操作符

举个例子:

Observable.just("RxJava")
        .observeOn(getNamedScheduler("map之前的observeOn"))
        .map(s -> {
            threadInfo(".map()-1");
            return s + "-map1";
        .map( s -> {
            threadInfo(".map()-2");
            return s + "-map2";
        .observeOn(getNamedScheduler("subscribe之前的observeOn"))
        .subscribe(s -> {
            threadInfo(".onNext()");
            System.out.println(s + "-onNext");
        });

结果如下:

.map()-1 => map之前的observeOn
.map()-2 => map之前的observeOn
.onNext() => subscribe之前的observeOn
RxJava-map1-map2-onNext

下面通过源码来进行分析:

1、observeOn()源码

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));

这里引出了lift()函数

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));

关于lift的详细介绍,如果不明白lift的原理,参考这里:RxJava 之二—— Lift()详解

用OperatorObserveOn对象,创建OnSubscribeLift对象(实现了OnSubscribe接口),接着创建Observable对象。为了加以区分,这里我们把OnSubscribeLift叫做OnSubscribe_2,Observable叫做Observable_2。

2、OperatorObserveOn代码:

public final class OperatorObserveOn<T> implements Operator<T, T> {
    private final Scheduler scheduler;
    private final boolean delayError;
    private final int bufferSize;
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
    public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
        this(scheduler, delayError, RxRingBuffer.SIZE);
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init(




    
);
            return parent;
    public static <T> Operator<T, T> rebatch(final int n) {
        return new Operator<T, T>() {
            @Override
            public Subscriber<? super T> call(Subscriber<? super T> child) {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
                parent.init();
                return parent;
    /** Observe through individual queue per observer. */
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final boolean delayError;
        final Queue<Object> queue;
        /** The emission threshold that should trigger a replenishing request. */
        final int limit;
        // the status of the current stream
        volatile boolean finished;
        final AtomicLong requested = new AtomicLong();
        final AtomicLong counter = new AtomicLong();
         * The single exception if not null, should be written before setting finished (release) and read after
         * reading finished (acquire).
        Throwable error;
        /** Remembers how many elements have been emitted before the requests run out. */
        long emitted;
        // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
        // not prevent anything downstream from consuming, which will happen if the Subscription is chained
        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            this.on = NotificationLite.instance();
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        void init() {
            // don't want this code in the constructor because `this` can escape through the 
            // setProducer call
            Subscriber<? super T> localChild = child;
            localChild.setProducer(new Producer() {
                @Override
                public void request(long n) {
                    if (n > 0L) {
                        BackpressureUtils.getAndAddRequest(requested, n);
                        schedule();
            });
            localChild.add(recursiveScheduler);
            localChild.add(this);
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            schedule();
        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            finished = true;
            schedule();
        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            error = e;
            finished = true;
            schedule();
        protected void schedule()




    
 {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
        // only execute this from schedule()
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)
            for (;;) {
                long requestAmount = requested.get();
                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;
                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    if (empty) {
                        break;
                    localChild.onNext(localOn.getValue(v));
                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
        boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
            if (a.isUnsubscribed()) {
                q.clear();
                return true;
            if (done) {
                if (delayError) {
                    if (isEmpty) {
                        Throwable e = error;
                        try {
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onCompleted();
                        } finally {
                            recursiveScheduler.unsubscribe();
                } else {
                    Throwable e = error;
                    if (e != null) {
                        q.clear();
                        try {
                            a.onError(e);
                        } finally {
                            recursiveScheduler.unsubscribe();
                        return true;
                    } else
                    if (isEmpty) {
                        try {
                            a.onCompleted();
                        } finally {
                            recursiveScheduler.unsubscribe();
                        return true;
            return false;

虽然代码很长,但是也就是三部分

  • 构造函数,
  • 实现Operator所继承的Func1中的call()函数
  • 静态内部类ObserveOnSubscriber< T>

下面来逐一分析:

因为调用Observable.等函数而需要创建的称之为Observable_1,Subscriber_1。
因为调用observeOn()而创建的称之为Observable_2,Subscriber_2

2.1、创建OperatorObserveOn对象

上面这段代码,主要功能就是创建OperatorObserveOn对象

既然是Operator,那么它的职责就是把一个Subscriber转换成另外一个Subscriber,

2.2、OperatorObserveOn对象中的call()函数返回ObserveOnSubscriber对象(Subscriber_2)

我们来看下call函数都做了什么:

public void onNext(final T t) {
    if (isUnsubscribed() || finished) {
        return;
    if (!queue.offer(on.next(t))) {
        onError(new MissingBackpressureException());
        return;
    schedule();

这里做了两件事,

  1. 把执行的结果缓存到一个队列里,这里的on对象,不是Subscriber_1。
  2. 调用schedule()启动传入的线程所创建的worker

2.3、schedule()代码:

protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
  • recursiveScheduler 就是之前我们传入的Scheduler,就是在observeOn()传入的指定线程,例如:AndroidScheluders.mainThread()

2.4、我们看下在scheduler()中调用的call()方法代码,call()方法只能由scheduler()去调用执行

@Override
public void call() {
    ...
    final Subscriber<? super T> localChild = this.child;
    for (;;) {
        ...
        boolean done = finished;
        Object v = q.poll();
        boolean empty = v == null;
        if (checkTerminated(done, empty, localChild, q)) {
            return;
        if (empty) {
            break;
        localChild.onNext(localOn.getValue(v));
        ...
    if (emitted != 0L) {
        request(emitted);

OK,在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的localChild(即Subscriber_1,是在第35行代码传递进来的) , 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,此时是工作在observeOn()指定的线程。

那么总结起来的结论就是:

  • observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
  • observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

下面给出两次调用observeOn()的示意图

我们经常多次使用subscribeOn()切换线程,那么以后是否可以组合observeOn()和subscribeOn()达到自由切换的目的呢?

subscribeOn()改变的是subscribe()这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

  • 对subscribeOn()的调用是自下向上,所以连续多次调用subscribeOn(),结果会被最上面的subscribeOn()覆盖。(生成和消费都会被覆盖)

  • observeOn()之上有subscribeOn()调用
    observeOn()的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn()调用了,也只是改变observeOn()这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn()控制。

  • observeOn()之下有subscribeOn()调用
    这也不会改变observeOn()所指定的消费线程,因为observeOn()是自上而下调用,对subscribeOn()的调用是自下向上,在observeOn()指定的线程会覆盖下面subscribeOn()指定线程来去消费

用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

参考:http://blog.csdn.net/jdsjlzx/article/details/51685769
http://blog.csdn.net/jdsjlzx/article/details/51686152
https://segmentfault.com/a/1190000004856071
https://gank.io/post/560e15be2dca930e00da1083
你真的会用RxJava么?RxJava线程变换之observeOn与subscribeOn

关注我的公众号,轻松了解和学习更多技术
这里写图片描述

转载请标明出处:http://blog.csdn.net/xx326664162/article/details/51967967 文章出自:薛瑄的博客你也可以查看我的其他同类文章,也会让你有一定的收货!为什么多次调用subscribeOn()却只有第一个起作用? 为什么多次调用observeOn()却可以切换到不同线程 observeOn()后能不能再次调用subscribeOn()?如果
RxJava简介 RxJava是现在大火的一个异步框架,他代码简洁好懂,功能强大,扩展性高,维护难度小。RxJava最吸引人的功能就是它的线程切换功能。在Android开发中,UI组件只能在主线程中进行,但是主线程中执行太复杂的逻辑又会导致APP卡顿,因此灵活的线程切换是一个安卓开发工程师的必会技能。RxJava提供了优秀的线程切换能力,能在不同的线程执行规定的逻辑代码。 这次就来聊一聊RxJ...
转载自安德雷士的RxJava线程切换——ObserveOnSubscribeOn的区别 RxJava很优势的一个方面就是他的线程切换,基本是依靠ObserveOnSubscribeOn这两个操作符来完成的。 先来看看什么是ObserveOnSubscribeOn,官方对他们的定义是这样的: ObserveOn specify the Scheduler on which an observ...
RxJava 是一个响应式编程框架,里面代码比较复杂,本系列文章将从以下几个角度来分析这个框架。 1. RxJava 的链路调用流程。 2. RxJava 的常用操作符 `map、flatmap`。 3. RxJava线程调度。 4. 自己实现一个简易版的响应式框架。
observeOn 指定一个观察者在哪个调度器上观察这个Observable,当每次调用了ObservableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObservableOn切换了调度器。 多次使用 subscribeOn //subscribeOn : 切换订阅...
RxJava有引以为傲的链式调用、海量的操作符、便捷的线程切换。如果找到一个入口同时了解三者,线程切换再合适不过了,它同时利用了三者的特性,又不至于内容过于琐碎繁杂。 RxJava切换线程的操作符总共只有两个:observeOnsubscribeOn: observeOn用于切换观察者Observer的onNext执行线程 subscribeOn用于切换被观察者Observable的subscribe执行线程 如下这段代码就是本篇分析的重点,很好地体现了链式调用的特点:不加分号,一点到底。(为了减小
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; 从上图中,我们可以看出,observeOn主要作的工作是: 1,通过指定的scheduler来切换线程,用来emit数据,这个数据就是onNext(data)方法的参数。 2,emit出来的数据,先异步的缓存到一个buffer,实际上是缓存到了一个Queue中。 3,通过指定的scheduler来异步的消费Q //获取当前的线程name(是主线程还是异步线程) private String getCurrentThread() { return Thread.currentThread().getName(); 1.默认线程 上下游默认都是主线程 2.为上游设置异步线程,下游设置主线程 //上游配置异步线程,下游配置主线程RxJava内部是一个线程池 Observable.create
使用RxJavasubscribeOn和observeOn可以方便地进行线程切换,但我发现很多人由于对subscribeOn的理解不到位,在使用中会发生意想不到的bug。 subscribeOn 提起subscribeOn,很多人都知道它可以用来切换上游线程,且只有第一次生效。这种理解明显是带有错误的,看一段代码 val observable = Observable.create<Int> { emitter -> log("Subscribing") thread(.