RxJava Subject解析

以下为官方介绍:

Subject 类定义

Subject 可以看成是一个桥梁或者代理,在某些 ReactiveX 实现中(如 RxJava ),它同时充当了 Observer 和 Observable 的角色。因为它是一个 Observer,它可以订阅一个或多个Observable;又因为它是一个 Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

由于一个 Observable 订阅一个 Observable,它可以触发这个 Observable 开始发射数据(如果那个 Observable 是“冷”的,就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject 可以把原来那个”冷”的 Observable 变成“热”的。

Subject 类定义如下:

/**
 * Represents an object that is both an Observable and an Observer.
 */
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

通过源码我们可以知道,Subject 继承自 Observable,也就是说 Subject 可以作为被观察者,还有,它又实现了 Observer 接口,这就是说它也可以作为观察者。

一共有五种类型的Subject,分别为

AsyncSubject

一个 AsyncSubject 只在原始 Observable 完成后,发射来自原始 Observable 的最后一个值。(如果原始 Observable 没有发射任何值,AsyncObject 也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

然而,如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

样例代码:

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        
        final AsyncSubject<Integer> asyncSub = AsyncSubject.create();
        asyncSub.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "AsyncSubject : OnCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "AsyncSubject : OnError: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "AsyncSubject : onNext:" + value);
            }
        });


        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "Observable OnCompleted");
                asyncSub.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " Observable onError");
                asyncSub.onError(e);
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, " Observable onNext:" + value);
                asyncSub.onNext(value);
            }
        });
    }

运行结果如下:

03-06 21:12:39.933 6746-6746/? D/MainActivity:  Observable onNext:0
03-06 21:12:39.933 6746-6746/? D/MainActivity:  Observable onNext:1
03-06 21:12:39.933 6746-6746/? D/MainActivity:  Observable onNext:2
03-06 21:12:39.933 6746-6746/? D/MainActivity: Observable OnCompleted
03-06 21:12:39.933 6746-6746/? D/MainActivity: AsyncSubject : onNext:2 //发射最后的值,为2
03-06 21:12:39.933 6746-6746/? D/MainActivity: AsyncSubject : OnCompleted

BehaviorSubject

当观察者订阅 BehaviorSubject 时,它开始发射原始 Observable 最近发射的数据**(如果此时还没有收到任何数据,它会发射一个默认值)**,然后继续发射其它任何来自原始 Observable 的数据。

然而,如果原始的 Observable 因为发生了一个错误而终止,BehaviorSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

发送默认值样例代码:

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final BehaviorSubject<Integer> behaviorSub = BehaviorSubject.create(10);//默认值
        behaviorSub.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "BehaviorSubject : OnCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "BehaviorSubject : OnError: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "BehaviorSubject : onNext:" + value);
            }
        });


        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 2; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "Observable OnCompleted");
                behaviorSub.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " Observable onError");
                behaviorSub.onError(e);
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, " Observable onNext:" + value);
                behaviorSub.onNext(value);
            }
        });
    }

运行结果如下:

03-06 21:15:20.955 10545-10545/? D/MainActivity: BehaviorSubject : onNext:10  //先发射了默认值
03-06 21:15:20.955 10545-10545/? D/MainActivity:  Observable onNext:0
03-06 21:15:20.955 10545-10545/? D/MainActivity: BehaviorSubject : onNext:0
03-06 21:15:20.955 10545-10545/? D/MainActivity:  Observable onNext:1
03-06 21:15:20.955 10545-10545/? D/MainActivity: BehaviorSubject : onNext:1
03-06 21:15:20.955 10545-10545/? D/MainActivity:  Observable onNext:2
03-06 21:15:20.955 10545-10545/? D/MainActivity: BehaviorSubject : onNext:2
03-06 21:15:20.955 10545-10545/? D/MainActivity: Observable OnCompleted
03-06 21:15:20.955 10545-10545/? D/MainActivity: BehaviorSubject : OnCompleted

不发送默认值样例代码:

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final BehaviorSubject<Integer> behaviorSub = BehaviorSubject.create(10);

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 5; i++) {
                    if(i == 1){
                        behaviorSub.subscribe(new Observer<Integer>() {
                            @Override
                            public void onCompleted() {
                                Log.d(TAG, "BehaviorSubject : OnCompleted");
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "BehaviorSubject : OnError: " + e.getMessage());
                            }

                            @Override
                            public void onNext(Integer value) {
                                Log.d(TAG, "BehaviorSubject : onNext:" + value);
                            }
                        });
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "Observable OnCompleted");
                behaviorSub.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " Observable onError");
                behaviorSub.onError(e);
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, " Observable onNext:" + value);
                behaviorSub.onNext(value);
            }
        });
    }

运行结果如下:

03-06 21:31:42.768 32375-32375/? D/MainActivity:  Observable onNext:0
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:0
03-06 21:31:42.769 32375-32375/? D/MainActivity:  Observable onNext:1
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:1
03-06 21:31:42.769 32375-32375/? D/MainActivity:  Observable onNext:2
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:2
03-06 21:31:42.769 32375-32375/? D/MainActivity:  Observable onNext:3
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:3
03-06 21:31:42.769 32375-32375/? D/MainActivity:  Observable onNext:4
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:4
03-06 21:31:42.769 32375-32375/? D/MainActivity:  Observable onNext:5
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : onNext:5
03-06 21:31:42.769 32375-32375/? D/MainActivity: Observable OnCompleted
03-06 21:31:42.769 32375-32375/? D/MainActivity: BehaviorSubject : OnCompleted

PublishSubject

PublishSubject 只会把在订阅发生的时间点之后来自原始 Observable 的数据发射给观察者。需要注意的是,PublishSubject 可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在 Subject 被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始 Observable 的所有数据都被分发,你需要这样做:或者使用 Create 创建那个 Observable 以便手动给它引入“冷” Observable 的行为(当所有观察者都已经订阅时才开始发射数据),或者改用 ReplaySubject。

如果原始的 Observable 因为发生了一个错误而终止,BehaviorSubject 将不会发射任何数据,只是简单的向前传递这个错误通知。

样例代码:

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final PublishSubject<Integer> pubSubject = PublishSubject.create();

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 5; i++) {
                    if(i == 1){//到1才订阅,之前的0就不会被发送
                        pubSubject.subscribe(new Observer<Integer>() {
                            @Override
                            public void onCompleted() {
                                Log.d(TAG, "PublishSubject : OnCompleted");
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "PublishSubject : OnError: " + e.getMessage());
                            }

                            @Override
                            public void onNext(Integer value) {
                                Log.d(TAG, "PublishSubject : onNext:" + value);
                            }
                        });
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "Observable OnCompleted");
                pubSubject.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " Observable onError");
                pubSubject.onError(e);
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, " Observable onNext:" + value);
                pubSubject.onNext(value);
            }
        });
    }

运行结果如下:

03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:0 //没有订阅,不被发送给PublishSubject
03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:1
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : onNext:1
03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:2
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : onNext:2
03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:3
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : onNext:3
03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:4
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : onNext:4
03-06 21:29:11.442 28789-28789/? D/MainActivity:  Observable onNext:5
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : onNext:5
03-06 21:29:11.442 28789-28789/? D/MainActivity: Observable OnCompleted
03-06 21:29:11.442 28789-28789/? D/MainActivity: PublishSubject : OnCompleted

ReplaySubject

ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。也有其它版本的 ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始 Observable 发射的)。

如果你把 ReplaySubject 当作一个观察者使用,注意不要从多个线程中调用它的 onNext 方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反 Observable 协议,给Subject 的结果增加了不确定性。

样例代码:

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final ReplaySubject<Integer> replaySub = ReplaySubject.create();

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i <= 5; i++) {
                    if(i == 3){//到3的时候再订阅,但它之前的0、1、2都被重新发送了
                        replaySub.subscribe(new Observer<Integer>() {
                            @Override
                            public void onCompleted() {
                                Log.d(TAG, "ReplaySubject : OnCompleted");
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "ReplaySubject : OnError: " + e.getMessage());
                            }

                            @Override
                            public void onNext(Integer value) {
                                Log.d(TAG, "ReplaySubject : onNext:" + value);
                            }
                        });
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "Observable OnCompleted");
                replaySub.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " Observable onError");
                replaySub.onError(e);
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, " Observable onNext:" + value);
                replaySub.onNext(value);
            }
        });
    }

运行结果如下:

03-06 21:35:57.907 5864-5864/? D/MainActivity:  Observable onNext:0
03-06 21:35:57.907 5864-5864/? D/MainActivity:  Observable onNext:1
03-06 21:35:57.907 5864-5864/? D/MainActivity:  Observable onNext:2
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:0 //之前的0、1、2都被重新发送了
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:1
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:2
03-06 21:35:57.908 5864-5864/? D/MainActivity:  Observable onNext:3
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:3
03-06 21:35:57.908 5864-5864/? D/MainActivity:  Observable onNext:4
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:4
03-06 21:35:57.908 5864-5864/? D/MainActivity:  Observable onNext:5
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : onNext:5
03-06 21:35:57.908 5864-5864/? D/MainActivity: Observable OnCompleted
03-06 21:35:57.908 5864-5864/? D/MainActivity: ReplaySubject : OnCompleted

Subjcet 注意点

  • Subjcet 不是线程安全的。

使用 Subject 不是线程安全的 ,如果在不同的线程调用它的 onNext 方法,很有可能造成竞态条件(race conditions),我们应该尽可能避免这种情况的出现,因为除非在代码中写足够详细的注释,否则日后维护这段代码的程序员很可能莫名其妙地踩了坑。如果你认为你确实有必要使用 Subject, 那么请把它转化为 SerializedSubject,它可以保证如果多个线程同时调用 onNext 方法,依然是线程安全的。

SerializedSubject<Integer,Integer> subject = PublishSubject.<Integer>create().toSerialized();
文章目录
  1. 1. Subject 类定义
  2. 2. AsyncSubject
  3. 3. BehaviorSubject
  4. 4. PublishSubject
  5. 5. ReplaySubject
  6. 6. Subjcet 注意点
|