Observable.map源码分析

响应式编程思想就是不需要考虑函数的调用顺序,只需要考虑接口的结果,在返回结果之前,会经过一系列事件的链式调用,这些事件像流一样的传播(并且可能不按顺序)出去,然后影响结果。我们接着来看一下RxJava的map操作符。

对Observable发射的每一项数据应用一个函数,执行变换操作:

作用简介

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
RxJava将这个操作符实现为map函数。这个操作符默认不在任何特定的调度器上执行。

如下代码,我们用一个boolean型的input作为输入,然后我们可以根据输入来做自己的操作,然后返回一个boolean型的结果。

boolean input = true;
Observable.just(input)
                .map(new Func1<Boolean, Boolean>() {
                    @Override
                    public Boolean call(Boolean in) {
                        boolean result = false;
                        if(in){
                            //自己的动作
                        } else {
                            //自己的动作
                        }
                        return result;
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {

                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });

map的创建

map并没有直接Observable.map()的静态方法,都是需要先创建一个Observable,然后再调用map()方法。它们的输入参数是一致的,在这里都是Object。

Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {

            }
        })
                .map(new Func1<Object, Object>() {
                    @Override
                    public Object call(Object o) {
                        return null;
                    }
                });

map方法内部是直接调用的lift()函数,它不像flatMap方法内部还调用ScalarSynchronousObservable或者merge方法。并且参数Func1也不一样,map是返回一个数据对象,flatMap则是返回一个Observable。通过文章我们也可以知道,flatMap内部调用的merge方法的参数就是调用map方法。

/**
     * Returns an Observable that applies a specified function to each item emitted by the source Observable and
     * emits the results of these function applications.
     */
    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

还有一点,map调用lift方法时就不像flatmap那样可以有多种Operator,它是直接传入一个OperatorMap:

/**
 * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
 * this transformation as a new {@code Observable}.
 */
public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}

lift()函数的具体介绍可以参考文章 RxJava基本流程和lift源码分析

/**
     * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
     * the values of the current Observable through the Operator function.
     */
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);//产生操作的关键
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

总结

map 为一对一变换。即输入一个Object并输出一个Object,这个Object可以是单对象,也可以是对象数组,这个可以自由把握。在这个过程中,我们可以自由的对输入进行相应的变换处理然后输出结果对象即可。

Observable.flatmap源码分析

文章目录
  1. 1. 作用简介
  2. 2. map的创建
  3. 3. 总结
|