Observable.flatmap源码分析

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。简单的说,就是flatMap()的作用是输入一个Observable,再输出一个经过特定变换的Observable。

作用简介

FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

如下代码,我们用一个boolean型的input作为输入,然后经过判断,如果为true,则输出一个List<string>的Observable.just(list),如果为false,则直接返回一个空值null的Observable.just(null)。

boolean input = true;
        Observable.just(input)
                .flatMap(new Func1<Boolean, Observable<List<String>>>() {
                    @Override
                    public Observable<List<String>> call(Boolean result) {
                        if(result){
                            List<String> list = new ArrayList<String>();
                            return Observable.just(list);
                        } else {
                            return Observable.just(null);
                        }
                    }
                })
                .subscribe(new Observer<List<String>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<String> list) {

                    }
                });

flatmap的创建

flatmap并没有直接Observable.flatMap()的静态方法,都是需要先创建一个Observable,然后再调用flatMap()方法。它们的输入参数是一致的,在这里都是Object。

Observable.create(new Observable.OnSubscribe<Object>() {//先建一个Observable
            @Override
            public void call(Subscriber<? super Object> subscriber) {

            }
        })
                .flatMap(new Func1<Object, Observable<?>>() {//调用flatMap方法
                    @Override
                    public Observable<?> call(Object o) {
                        return null;
                    }
                })

flatMap方法内部分两条路走,一条是ScalarSynchronousObservable,一条是调用merge方法。

/**
     * Returns an Observable that emits items based on applying a function that you supply to each item emitted
     * by the source Observable, where that function returns an Observable, and then merging those resulting
     */
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

ScalarSynchronousObservable 方式

flatmap方法会先判断当前的Observable是不是ScalarSynchronousObservable(继承自Observable<T>)类型的,如果是则会直接执行它的scalarFlatMap方法,见上面的代码。而scalarFlatMap又做了什么呢?先看代码:

public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> child) {
                Observable<? extends R> o = func.call(t);
                if (o.getClass() == ScalarSynchronousObservable.class) {
                    child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
                    child.onCompleted();
                } else {
                    o.unsafeSubscribe(new Subscriber<R>(child) {
                        @Override
                        public void onNext(R v) {
                            child.onNext(v);
                        }
                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }
                    });
                }
            }
        });
    }

它就是直接return一个Observable,上面提到过,它是继承自Observable<T>,这里调用的create()方法就是调用父类的create方法:

public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

merge方式

如果当前Observable不是ScalarSynchronousObservable类型的,则调用Observable的merge方法:merge(map(func)),它的参数是一个map产生的Observable,代码如下:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

我们看到,map里面又调用了lift方法,它是RxJava中实现自定义Operator的关键,实质是对自定义事件序列的处理和再发送,翻看源码我们可以知道,在RxJava中,对事件的处理发送都是基于同一个基础的变换方法lift()。

/**
     * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
     * the values of the current Observable through the Operator function.
     */
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);//产生操作的关键
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

从源码可以看到,它实质是新new一个Observable返回。call()方法首先是取一个Subscriber,即一个Operator。

Subscriber<? super T> st = hook.onLift(operator).call(o);

Operator的实现类主要有以下这些(长长的列表啊!!!):

  • OperatorAll (rx.internal.operators) OperatorAny (rx.internal.operators)
  • OperatorAsObservable (rx.internal.operators)
  • OperatorBufferWithSingleObservable (rx.internal.operators)
  • OperatorBufferWithSize (rx.internal.operators)
  • OperatorBufferWithStartEndObservable (rx.internal.operators)
  • OperatorBufferWithTime (rx.internal.operators)
  • OperatorCast (rx.internal.operators)
  • OperatorConcat (rx.internal.operators)
  • OperatorDebounceWithSelector (rx.internal.operators)
  • OperatorDebounceWithTime (rx.internal.operators)
  • OperatorDelay (rx.internal.operators)
  • OperatorDelayWithSelector (rx.internal.operators)
  • OperatorDematerialize (rx.internal.operators)
  • OperatorDistinct (rx.internal.operators)
  • OperatorDistinctUntilChanged (rx.internal.operators)
  • OperatorDoOnEach (rx.internal.operators)
  • OperatorDoOnRequest (rx.internal.operators)
  • OperatorDoOnSubscribe (rx.internal.operators)
  • OperatorDoOnUnsubscribe (rx.internal.operators)
  • OperatorElementAt (rx.internal.operators)
  • OperatorFilter (rx.internal.operators)
  • OperatorFinally (rx.internal.operators)
  • OperatorGroupBy (rx.internal.operators)
  • OperatorIgnoreElements (rx.internal.operators)
  • OperatorMap (rx.internal.operators)
  • OperatorMapNotification (rx.internal.operators)
  • OperatorMapPair (rx.internal.operators)
  • OperatorMaterialize (rx.internal.operators)
  • OperatorMerge (rx.internal.operators)
  • OperatorObserveOn (rx.internal.operators)
  • OperatorOnBackpressureBlock (rx.internal.operators)
  • OperatorOnBackpressureBuffer (rx.internal.operators)
  • OperatorOnBackpressureDrop (rx.internal.operators)
  • OperatorOnBackpressureLatest (rx.internal.operators)
  • OperatorOnErrorResumeNextViaFunction (rx.internal.operators)
  • OperatorOnErrorResumeNextViaObservable (rx.internal.operators)
  • OperatorOnErrorReturn (rx.internal.operators)
  • OperatorOnExceptionResumeNextViaObservable (rx.internal.operators)
  • OperatorRetryWithPredicate (rx.internal.operators)
  • OperatorSampleWithObservable (rx.internal.operators)
  • OperatorSampleWithTime (rx.internal.operators)
  • OperatorScan (rx.internal.operators)
  • OperatorSerialize (rx.internal.operators)
  • OperatorSingle (rx.internal.operators)
  • OperatorSkip (rx.internal.operators)
  • OperatorSkipLast (rx.internal.operators)
  • OperatorSkipLastTimed (rx.internal.operators)
  • OperatorSkipTimed (rx.internal.operators)
  • OperatorSkipUntil (rx.internal.operators)
  • OperatorSkipWhile (rx.internal.operators)
  • OperatorSubscribeOn (rx.internal.operators)
  • OperatorSwitch (rx.internal.operators)
  • OperatorSwitchIfEmpty (rx.internal.operators)
  • OperatorTake (rx.internal.operators)
  • OperatorTakeLast (rx.internal.operators)
  • OperatorTakeLastOne (rx.internal.operators)
  • OperatorTakeLastTimed (rx.internal.operators)
  • OperatorTakeTimed (rx.internal.operators)
  • OperatorTakeUntil (rx.internal.operators)
  • OperatorTakeUntilPredicate (rx.internal.operators)
  • OperatorTakeWhile (rx.internal.operators)
  • OperatorThrottleFirst (rx.internal.operators)
  • OperatorTimeInterval (rx.internal.operators)
  • OperatorTimeoutBase (rx.internal.operators)
  • OperatorTimestamp (rx.internal.operators)
  • OperatorToMap (rx.internal.operators)
  • OperatorToMultimap (rx.internal.operators)
  • OperatorToObservableList (rx.internal.operators)
  • OperatorToObservableSortedList (rx.internal.operators)
  • OperatorUnsubscribeOn (rx.internal.operators)
  • OperatorWindowWithObservable (rx.internal.operators)
  • OperatorWindowWithObservableFactory (rx.internal.operators)
  • OperatorWindowWithSize (rx.internal.operators)
  • OperatorWindowWithStartEndObservable (rx.internal.operators)
  • OperatorWindowWithTime (rx.internal.operators)
  • OperatorWithLatestFrom (rx.internal.operators)
  • OperatorZip (rx.internal.operators)
  • OperatorZipIterable (rx.internal.operators)

我们以OperatorDoOnSubscribe为例,看一下call方法的源码。它还是比较简单的,就是直接返回一个Subscriber。

//OperatorDoOnSubscribe.call():
@Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        subscribe.call();
        // Pass through since this operator is for notification only, there is
        // no change to the stream whatsoever.
        return Subscribers.wrap(child);
    }

//Subscribers.wrap()
public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
        return new Subscriber<T>(subscriber) {

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(T t) {
                subscriber.onNext(t);
            }

        };
    }

我们再回头看一下merge方法的源码:

public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

发现没有,它也是调用了Observable的lift方法,正像上面所说的,lift是产生操作的基础。

Func1参数

RxJava有一系列的(Func+数字)的接口,Func1就是其中一个,它继承自Function类,并提供了一个泛型的call方法,就是一个操作逻辑,就像Runnable的run方法一样,我们可以在它里面执行自己定义的逻辑。

/**
 * Represents a function with one argument.
 */
public interface Func1<T, R> extends Function {
    R call(T t);
}

实现Func1这个接口的类也很多,我就不一一列举了。举个例子,比如OperatorObserveOn (rx.internal.operators)类,它实现了Operator<T, T>,而Operator<T, T>又是继承自Func1<Subscriber<? super R>, Subscriber<? super T>>的。

在flatmap中,Func1需要返回一个Observable。

总结

flatMap()操作符使用原本会被原始Observable(即create/just等产生的)发送的事件,来创建一个新的Observable,然后将它发送出来。这个Observable可以是我们定义一个的单操作结果,也可以是一系列操作的合集,还可以是一个exception: return Observable.error(new Throwable(“it is an exception”))。

但有一点我们得记住,flatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的;如果想按照严格的顺序发射传入的数据,我们可以使用ConcatMap。

flatMap

concatMap

flatMapIterable

switchMap

Observable.map源码分析

什么鬼?国庆节还在撸代码?疯了......(;′⌒`)
明天就回家了(๑→ܫ←)
文章目录
  1. 1. 作用简介
  2. 2. flatmap的创建
    1. 2.1. ScalarSynchronousObservable 方式
    2. 2.2. merge方式
  3. 3. Func1参数
  4. 4. 总结
|