RxJava线程切换原理

RxJava在圈子里越来越火,相信很大的一个原因就是它的线程切换。它的线程切换可以用优雅来形容,链式调用,简单、方便。今天,就让我们来窥探一下RxJava的线程切换原理。本次拆轮子,还是按原样,通过小例子,研读RxJava源码等来理解整个过程、结构、原理,我们首要的是先对线程切换的原理有个全局的概览,细节再慢慢来深入。

前言

线程的切换都是通过subscribeOn或者observeOn来进行,生产者的执行线程只受subscribeOn控制,不受observeOn影响。subscribeOn指定的线程环境能一直维持到第一次observeOn出现之前。要讲线程切换原理之前,我们先来看一下下面的几个类定义:

  • Operator
/**
     * Operator function for lifting into an Observable.
     */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

Operator是Observable中定义的接口,即用户的逻辑操作,RxJava框架会调用lift方法将Operator包装成为Observable。

  • ObserveOnSubseriber

ObserveOnSubseriber是被订阅者的类,处理用户数据逻辑,也即生产者,用来产生用户数据的。

  • OperatorObserveOn

OperatorObserveOn是订阅者的类,接收数据的,也即消费者,消费生产者发送过来的数据。

  • Worker

Worker是线程真正执行的地方,也就是单独新建的一个线程或线程池中的某个线程。

原理解析

我们先创建一个Observable:

Observable.just(null)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

just方法又是调用的ScalarSynchronousObservable,然后new一个OnSubscribe作为构造函数的参数,暂且叫做1号OnSubscribe,这个下面会再提到,也是线程切换的区别所在:

protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                s.onNext(t);
                s.onCompleted();
            }

        });
        this.t = t;
    }

线程切换的要点在lift()函数里面,都是基于同一个基础的变换方法: lift(Operator)

先来看一下它的源码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

我们可以看到,调用lift()方法后(即执行subscribeOn或observeOn),是返回一个新的Observable,而不是调用者的Observable,这里同样是重新创建了一个OnSubscribe,暂且叫做2号OnSubscribe,我们再回头看看,这个OnSubscribe与前面提到的just()方法里面调用到的OnSubscribe不是同一个对象

这里是线程切换的关键点。当调用链来到lift()方法后,使用的是lift()所返回的新的 Observable,也就是它所触发的onSubscribe.call(subscriber)也是用新的Observable中的新 OnSubscribe,即我们上面命名的2号OnSubscribe。

OperatorSubscribeOn

再来看lift()函数的源码,它的第二个try方法体里面有个onSubscribe,这个OnSubscribe就是我们前面定义的1号onSubscribe,它就是我们调用just()方法后创建的原始Observable。

那它是怎么做到切换线程的呢?如上面的例子,subscribeOn(Schedulers.io()),它通过下面的代码(举例)产生一个新的Subscriber:

Subscriber<? super T> st = hook.onLift(operator).call(o);//将新的Subscriber对象o传递给OperatorSubscribeOn,它里面的call()方法去创建新的Worker线程

//OperatorSubscribeOn的call(o)方法
@Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();//新建线程
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();

                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {//如果是当前线程,则在当前线程执行
                                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                            producer.request(n);
                                        } else {//不是当前线程,将在新创建的Worker线程inner中执行
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }

然后,通过调用1号OnSubscribe的call()方法 onSubscribe.call(st) 将新创建的Subscriber与原始的Observable关联起来,即新的Subscriber去订阅原始的Observable。这样,生产者

通过上面的代码可以知道,Scheduler类其实并不负责异步线程处理,它只负责通过createWorker()类创建出一个Worker对象,真正负责任务的延时处理。

OperatorObserveOn

observeOn方法内部也是调用了lift()方法,然后创建一个operator,

//OperatorObserveOn.java
@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
    }

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();//创建新的worker线程
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
            } else {
                queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
            }
            this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
        }

protected void schedule() {
            if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
                recursiveScheduler.schedule(action);//用相应的线程进行数据输出调度
            }
        }

结合扔物线大大的图如下:


未完待续......

RxJava系列文章

参考

文章目录
  1. 1. 前言
  2. 2. 原理解析
  3. 3. OperatorSubscribeOn
  4. 4. OperatorObserveOn
  5. 5. 参考
|