RxJava 内部如何管理线程?

RxJava使用subscribeOn、observeOn和onNext的时候可以改变和切换线程,它们都是按顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。

Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:

调度器分类

调度器类型作用
Schedulers.computation( )用于计算任务,如事件循环或和回调处理,不要用于IO操作,默认线程数等于处理器的数量。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
Schedulers.from(executor)使用指定的Executor作为调度器
Schedulers.immediate( )这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器
Schedulers.io( )用于IO密集型任务,如异步阻塞IO操作(读写文件、读写数据库、网络信息交互等),这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( )为每个任务创建一个新线程,它是没有线程池在管理的
Schedulers.trampoline( )为当前线程建立一个队列,将当前任务加入到队列中依次执行。当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread()在主线程中工作

我们来看一下源码,看它是如何区分线程的?

private Schedulers() {
        Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
        if (c != null) {
            computationScheduler = c;
        } else {
            computationScheduler = new EventLoopsScheduler();
        }

        Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = new CachedThreadScheduler();
        }

        Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            newThreadScheduler = NewThreadScheduler.instance();
        }
    }

它内部维护了一个Schedulers的单例对象:private static final Schedulers INSTANCE = new Schedulers(); 在它的私有构造函数中,根据不同类型创建了相应的Scheduler,也即WorkThread。

我们先来看一下computation方式,它是一个EventLoopsScheduler,在这个类内部定义了一个静态的内部类,

 static final class FixedSchedulerPool {
        final int cores;

        final PoolWorker[] eventLoops;
        long n;

        FixedSchedulerPool() {
            // initialize event loops
            this.cores = MAX_THREADS; //决定线程池中线程个数
            this.eventLoops = new PoolWorker[cores];
            for (int i = 0; i < cores; i++) {
                this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
            }
        }

        public PoolWorker getEventLoop() {
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % cores)];
        }
    }

它的线程个数是根据MAX_THREADS来定的,那这个值又是在哪里赋值的呢?还是这个类,它有一个静态代码块:

/** 
     * Key to setting the maximum number of computation scheduler threads.
     * Zero or less is interpreted as use available. Capped by available.
     */
    static final String KEY_MAX_THREADS = "rx.scheduler.max-computation-threads";

static {
        int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0);
        int ncpu = Runtime.getRuntime().availableProcessors();
        int max;
        if (maxThreads <= 0 || maxThreads > ncpu) {
            max = ncpu;
        } else {
            max = maxThreads;
        }
        MAX_THREADS = max;
    }

它先取SystemProperty中的变量KEY_MAX_THREADS,然后判断这个值是不是大于cpu个数,如果小于,并且大小0则以这个值为线程的个数。也就是说,我们可以通过改变KEY_MAX_THREADS的值,来改变线程池中线程的最大个数,但不能大于cpu内核个数。

我们再来看一下io()方式:

private static final class CachedWorkerPool {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        private final ScheduledExecutorService evictExpiredWorkerExecutor;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
		//创建线程池
            evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            evictExpiredWorkerExecutor.scheduleWithFixedDelay(
                    new Runnable() {
                        @Override
                        public void run() {
                            evictExpiredWorkers();
                        }
                    }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
            );
        }

        private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
                60L, TimeUnit.SECONDS
        );//最大保活时间

        ThreadWorker get() {
            while (!expiringWorkerQueue.isEmpty()) {//如果空闲队列不为空,则从空闲队列中取一个线程来用,而不用每次新创建线程
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            return new ThreadWorker(WORKER_THREAD_FACTORY);
        }

        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }

        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            threadWorker.unsubscribe();
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }

        long now() {
            return System.nanoTime();
        }
    }

CachedThreadScheduler类内部也定义了一个静态内部类,用来创建线程池。线程池是用Executors.newScheduledThreadPool来创建的,核心线程个数为1,并传入了一个自定义的RxThreadFactory:

private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
            new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);

RxThreadFactory类内部是通过AtomicLongFieldUpdater来为线程定义名字的,它以对指定"类的 'volatile long’类型的成员"进行原子更新。它是基于反射原理实现的。就是说,在不断调用io()的时候,如果没有空闲线程,则每次新建一个,这个线程的的名字是由前缀prefix与counter组成的。

public final class RxThreadFactory implements ThreadFactory {
    final String prefix;
    volatile long counter;
    static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
            = AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");

    public RxThreadFactory(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
        t.setDaemon(true);
        return t;
    }
}

通过以上源码的分析,我们大致了解了RxJava是如何初始化、管理线程池的。我们也可以回顾一下在computation或io调度的模块里面使用Thread.currentThread().getName()方法,是不是会分别打印以下log:

08-10 22:33:37.180 11395-11565/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxNewThreadScheduler-1
08-10 22:33:37.180 11395-11565/com.ngudream.rxjava.example D/----->: ------->call线程:RxNewThreadScheduler-1

08-10 22:33:39.810 11395-11582/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxCachedThreadScheduler-1
08-10 22:33:39.812 11395-11582/com.ngudream.rxjava.example D/----->: ------->call线程:RxCachedThreadScheduler-1

08-10 22:33:42.112 11395-11601/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxComputationThreadPool-1
08-10 22:33:42.113 11395-11601/com.ngudream.rxjava.example D/----->: ------->call线程:RxComputationThreadPool-1

总结

我们特别强调一下io和computation,因为它们两个都是依赖线程池来维护线程的,区别就是io线程池中的个数是无限的,由AtomicLongFieldUpdater产生的递增值和prefix来决定线程的名字;而computation中则是一个固定线程数量的线程池,数据为cpu个数,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。


所以我们在使用时,需要注意,控制io线程的数量,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。


io() 的行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。


参考:

文章目录
  1. 1. 调度器分类
  2. 2. 总结
|