RxJava线程切换

前言

RxJava提供了多种线程调试,我们可以使用RxJava.Observable取代AsyncTask和AsyncTaskLoader。

  • subscribeOn() 用来指定Observable中的逻辑过程在特定的调度器上调用。
    • create()
    • just()
    • flatmap()
    • map()
    • filter()
    • doOnSubscribe()
    • 等等

subscribeOn() 原理图
  • observeOn() 用来指定Observable的结果数据在特定的调度器上调用。
    • onNext()
    • onError()
    • onComplete()
    • doOnNext()
    • map()
    • 等等

observeOn() 原理图

RxJava可以切换多种线程:

  • Schedulers.immediate() 在当前线程立即执行
  • Schedulers.trampoline() 在当前线程执行,但先等待当前线程执行完当前任务
  • Schedulers.newThread() 在新的线程上执行
  • Schedulers.computation() 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Schedulers.io() 在I/O线程执行操作
  • AndroidSchedulers.mainThread() 在主线程执行

mainThread

指定在主线程执行:

Observable.just(1)
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] map_thread: main
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

通过输出结果,我们可以看到map中的操作和subscribe中的操作都是在main线程执行的。

I/O

指定在I/O线程执行。

Observable.just(1)
                .subscribeOn(Schedulers.io())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] map_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] output: 2

通过输出结果,我们可以看到map中的操作和subscribe中的操作都是在I/O线程执行的。

newThread

指定在一个新线程中执行。

Observable.just(1)
                .subscribeOn(Schedulers.newThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] map_thread: RxNewThreadScheduler-1
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: RxNewThreadScheduler-1
D/MainActivity: [call] output: 2

通过输出结果,我们可以看到map中的操作和subscribe中的操作都是在一个新线程中执行的。

如果不指定subscribeOn或observeOn指定线程,则使用当前线程执行。

Log.d(TAG, "[onCreate] cur_thread: " + Thread.currentThread().getName());
        Observable.just(1)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [onCreate] cur_thread: main
D/MainActivity: [call] map_thread: main
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

通过输出结果,我们可以看到,当前线程是主线程,map的执行和subscribe的执行都是在主线程中。

其它各种的指定线程类似,在此不再举例哈。

线程切换

其实前面我们已经使用到了线程切换,就是 Observable.just(1)的创建其实是运行在Activity的主线程中,紧接着通过subscribeOn(Schedulers.io()),将它切换到了I/O线程中执行map中的操作。

主线程与IO线程切换

我们来实现主线程与I/O线程的切换,I/O线程中执行map操作,主线程中执行subscribe。

Observable.just(1)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] map_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

map的调度器由紧跟着它的前面一个或者它后面的第一个subscribeOn决定。subscribe的调度器则由紧跟着它的observeOn决定。

再来变换一下map与subscribeOn、observeOn的位置,看看结果:

new Thread(new Runnable() {
            @Override
            public void run() {
                Observable.just(1)
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .map(new Func1<Integer, Integer>() {
                            @Override
                            public Integer call(Integer in) {
                                Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                                Log.d(TAG, "[call] input: " + in);
                                int result = in * 2;
                                return result;
                            }
                        })
                        .subscribe(new Action1<Integer>() {
                            @Override
                            public void call(Integer result) {
                                Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                                Log.d(TAG, "[call] output: " + result);
                            }
                        });
            }
        }).start();

输出结果为:

D/MainActivity: [call] map_thread: main
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

我们在一个新线程里面创建 Observable.just(1),然后调换了map与subscribeOn、observeOn的位置,通过输出可以知道,map受离它最近的调度器影响observeOn(AndroidSchedulers.mainThread()),在主线程中执行它的逻辑。

再来看一个例子:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.d(TAG, "[call] create_thread: " + Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] create_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] map_thread: main
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

我们可以看到,create()方法最近的一个subscribeOn决定了它call()所在的线程。

赵彦军 RxJava 和 RxAndroid 五(线程调度)

  1. 对于 create(),just(),from(),doOnSubscribe() 等创建Observable的,我们定义为事件产生

  2. 对于 map(),flapMap(),scan(),filter() 等处理逻辑过程(发射数据和通知)的,我们定义为事件加工

  3. 对于subscribe()操作的过程,我们定义为事件消费

    • 事件产生:默认运行在当前线程,可以由 subscribeOn() 自定义线程
    • 事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程
    • 事件消费:默认运行在当前线程,可以有observeOn() 自定义

doOnSubscribe

doOnSubscribe执行的线程由它后面紧根的subscribeOn决定,而不是它前面的。

Observable.just(1)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "[call] doOnSubscribe_thread: " + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] doOnSubscribe_thread: RxNewThreadScheduler-1
D/MainActivity: [call] map_thread: main
D/MainActivity: [call] input: 1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

如果我们将subscribeOn(Schedulers.newThread())移到.doOnSubscribe(new Action0())前面去,我们将会看到下面的输出结果:

Observable.just(1)
                .subscribeOn(Schedulers.io())
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "[call] doOnSubscribe_thread: " + Thread.currentThread().getName());
                    }
                })

D/MainActivity: [call] doOnSubscribe_thread: main

即doOnSubscribe并没有受到它前面的subscribeOn影响,还是在创建线程即主线程执行。

doOnNext

doOnNext执行的线程由它前面紧跟的observeOn决定,而不是它后面的。

Observable.just(1)
                .subscribeOn(Schedulers.newThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .observeOn(Schedulers.computation())
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "[call] doOnNext_thread: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] map_thread: RxNewThreadScheduler-1
D/MainActivity: [call] input: 1
D/MainActivity: [call] doOnNext_thread: RxComputationThreadPool-3
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

如果我们将doOnNext前的observeOn去掉,我们将会看到下面的输出结果:

D/MainActivity: [call] map_thread: RxNewThreadScheduler-1
D/MainActivity: [call] input: 1
D/MainActivity: [call] doOnNext_thread: RxNewThreadScheduler-1
D/MainActivity: [call] subscribe_thread: main
D/MainActivity: [call] output: 2

多个subscribeOn

它是定义事件产生线程的

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.d(TAG, "[call] create_thread: " + Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map1_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] create_thread: RxNewThreadScheduler-1
D/MainActivity: [call] map1_thread: RxNewThreadScheduler-1
D/MainActivity: [call] subscribe_thread: RxNewThreadScheduler-1

通过结果我们可以知道都运行在了同一个新线程里面,也就是说,就算链式里面写了多个 subscribeOn,链接还是只受第一个 subscribeOn 的影响。

多个observeOn

它是定义事件消费线程的

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.d(TAG, "[call] create_thread: " + Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        })
                .observeOn(Schedulers.io())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map1_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .observeOn(Schedulers.newThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        Log.d(TAG, "[call] map2_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + integer);
                        int result = integer * 2;
                        return result;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] create_thread: main
D/MainActivity: [call] map1_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] map2_thread: RxNewThreadScheduler-1
D/MainActivity: [call] subscribe_thread: main

通过结果我们可以看到,每一个线程的转化都生效了,也就是说在事件的加工、消费上是受离它最新的 observeOn 影响。

混合调度

多个线程间的来回切换

Observable.just(1)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "[call] doOnSubscribe_thread: " + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(Schedulers.computation())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer in) {
                        Log.d(TAG, "[call] map1_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + in);
                        int result = in * 2;
                        return result;
                    }
                })
                .observeOn(Schedulers.newThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        Log.d(TAG, "[call] map2_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] input: " + integer);
                        return integer * 2;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "[call] doOnNext_thread: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer result) {
                        Log.d(TAG, "[call] subscribe_thread: " + Thread.currentThread().getName());
                        Log.d(TAG, "[call] output: " + result);
                    }
                });

输出结果为:

D/MainActivity: [call] doOnSubscribe_thread: RxComputationThreadPool-3
D/MainActivity: [call] map1_thread: RxComputationThreadPool-3
D/MainActivity: [call] input: 1
D/MainActivity: [call] map2_thread: RxNewThreadScheduler-1
D/MainActivity: [call] input: 2
D/MainActivity: [call] doOnNext_thread: main
D/MainActivity: [call] subscribe_thread: RxCachedThreadScheduler-1
D/MainActivity: [call] output: 4

测试结论

  • observeOn 决定下游代码运行的线程,observeOn 如果不定义则是默认使用当前运行的线程。
  • observeOn 事件的加工、消费上是受离它最新的 observeOn 影响。
  • subscribeOn 决定最原始数据源发射数据代码运行的线程,而接下来的发射数据代码是由 observeOn 决定的。
  • subscribeOn 如果在链式中填写多个,只会受第一个的影响。RxJava分配线程时是通过调用链自下而上处理,所以最上面的 subscribeOn 会覆盖下面的定义,也就是说 subscribeOn 在调用链中最多定义一次就可以,位置任意。
  • doOnSubscribe 受其后的 subscribeOn 影响,doOnNext受其前的 observeOn 影响。默认情况下subcribe发生的线程决定了,doOnSubscribe()执行所在的线程。

参考

文章目录
  1. 1. 前言
  2. 2. mainThread
  3. 3. I/O
  4. 4. newThread
  5. 5. 线程切换
    1. 5.1. 主线程与IO线程切换
    2. 5.2. doOnSubscribe
    3. 5.3. doOnNext
    4. 5.4. 多个subscribeOn
    5. 5.5. 多个observeOn
    6. 5.6. 混合调度
  6. 6. 测试结论
  7. 7. 参考
|