RxJava使用subscribeOn、observeOn和onNext的时候可以改变和切换线程,它们都是按顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。
Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:
调度器分类
调度器类型 | 作用 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作,默认线程数等于处理器的数量。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作(读写文件、读写数据库、网络信息交互等),这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程,它是没有线程池在管理的 |
Schedulers.trampoline( ) | 为当前线程建立一个队列,将当前任务加入到队列中依次执行。当其它排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | 在主线程中工作 |
我们来看一下源码,看它是如何区分线程的?
private Schedulers() {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = NewThreadScheduler.instance();
}
}
它内部维护了一个Schedulers的单例对象:private static final Schedulers INSTANCE = new Schedulers(); 在它的私有构造函数中,根据不同类型创建了相应的Scheduler,也即WorkThread。
我们先来看一下computation方式,它是一个EventLoopsScheduler,在这个类内部定义了一个静态的内部类,
static final class FixedSchedulerPool {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool() {
// initialize event loops
this.cores = MAX_THREADS; //决定线程池中线程个数
this.eventLoops = new PoolWorker[cores];
for (int i = 0; i < cores; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
}
}
public PoolWorker getEventLoop() {
// simple round robin, improvements to come
return eventLoops[(int)(n++ % cores)];
}
}
它的线程个数是根据MAX_THREADS来定的,那这个值又是在哪里赋值的呢?还是这个类,它有一个静态代码块:
/**
* Key to setting the maximum number of computation scheduler threads.
* Zero or less is interpreted as use available. Capped by available.
*/
static final String KEY_MAX_THREADS = "rx.scheduler.max-computation-threads";
static {
int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0);
int ncpu = Runtime.getRuntime().availableProcessors();
int max;
if (maxThreads <= 0 || maxThreads > ncpu) {
max = ncpu;
} else {
max = maxThreads;
}
MAX_THREADS = max;
}
它先取SystemProperty中的变量KEY_MAX_THREADS,然后判断这个值是不是大于cpu个数,如果小于,并且大小0则以这个值为线程的个数。也就是说,我们可以通过改变KEY_MAX_THREADS的值,来改变线程池中线程的最大个数,但不能大于cpu内核个数。
我们再来看一下io()方式:
private static final class CachedWorkerPool {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final ScheduledExecutorService evictExpiredWorkerExecutor;
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
//创建线程池
evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
evictExpiredWorkerExecutor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
60L, TimeUnit.SECONDS
);//最大保活时间
ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {//如果空闲队列不为空,则从空闲队列中取一个线程来用,而不用每次新创建线程
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
return new ThreadWorker(WORKER_THREAD_FACTORY);
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
threadWorker.unsubscribe();
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
}
CachedThreadScheduler类内部也定义了一个静态内部类,用来创建线程池。线程池是用Executors.newScheduledThreadPool来创建的,核心线程个数为1,并传入了一个自定义的RxThreadFactory:
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
RxThreadFactory类内部是通过AtomicLongFieldUpdater来为线程定义名字的,它以对指定"类的 'volatile long’类型的成员"进行原子更新。它是基于反射原理实现的。就是说,在不断调用io()的时候,如果没有空闲线程,则每次新建一个,这个线程的的名字是由前缀prefix与counter组成的。
public final class RxThreadFactory implements ThreadFactory {
final String prefix;
volatile long counter;
static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
= AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");
public RxThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
t.setDaemon(true);
return t;
}
}
通过以上源码的分析,我们大致了解了RxJava是如何初始化、管理线程池的。我们也可以回顾一下在computation或io调度的模块里面使用Thread.currentThread().getName()方法,是不是会分别打印以下log:
08-10 22:33:37.180 11395-11565/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxNewThreadScheduler-1
08-10 22:33:37.180 11395-11565/com.ngudream.rxjava.example D/----->: ------->call线程:RxNewThreadScheduler-1
08-10 22:33:39.810 11395-11582/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxCachedThreadScheduler-1
08-10 22:33:39.812 11395-11582/com.ngudream.rxjava.example D/----->: ------->call线程:RxCachedThreadScheduler-1
08-10 22:33:42.112 11395-11601/com.ngudream.rxjava.example D/----->: ------->onNext线程:RxComputationThreadPool-1
08-10 22:33:42.113 11395-11601/com.ngudream.rxjava.example D/----->: ------->call线程:RxComputationThreadPool-1
总结
我们特别强调一下io和computation,因为它们两个都是依赖线程池来维护线程的,区别就是io线程池中的个数是无限的,由AtomicLongFieldUpdater产生的递增值和prefix来决定线程的名字;而computation中则是一个固定线程数量的线程池,数据为cpu个数,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
所以我们在使用时,需要注意,控制io线程的数量,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。
io() 的行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
参考: