RxJava定时器:timer

本文将介绍RxJava的延迟事件:timer,它创建一个Observable,并在指定时间后发送一个数据对象,这个对象可以是普通的单数据,也可以是某个对象、某个操作等,如果是操作则等操作完成后它也就结束了,可以理解为延迟操作。

timer的流程示意图:
timer

timer的创建

其实Observable.timer内部还是调用create方法,即最后和调用Obserable.create()走的逻辑一样,只是create方法参数为Observable.OnSubscribe,而timer方法参数为OnSubscribeTimerOnce。我们可以通过源码知道:

//Observable.timer(delay, TimeUnit.MILLISECONDS)的创建过程
/**
     * Returns an Observable that emits one item after a specified delay, and then completes.
     */
    public final static Observable<Long> timer(long delay, TimeUnit unit) {
        return timer(delay, unit, Schedulers.computation());
    }

 /**
     * Returns an Observable that emits one item after a specified delay, on a specified Scheduler, and then
     * completes.
     */
    public final static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
        return create(new OnSubscribeTimerOnce(delay, unit, scheduler));
    }

/**
     * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
     * it.
     */
    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

timer延迟的实现

timer要达到延迟效果,主要是在OnSubscribeTimerOnce里面实现,它本身还是比较简洁的,代码量不多,主要是里面的Worker,即真正执行传入操作的工作者。

/**
 * Timer that emits a single 0L and completes after the specified time.
 * @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.timer.aspx'>MSDN Observable.Timer</a>
 */
public final class OnSubscribeTimerOnce implements OnSubscribe<Long> {
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;

    public OnSubscribeTimerOnce(long time, TimeUnit unit, Scheduler scheduler) {
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void call(final Subscriber<? super Long> child) {
        Worker worker = scheduler.createWorker();
        child.add(worker);
        worker.schedule(new Action0() { //重点是在这里,实现延迟事件
            @Override
            public void call() {
                try {
                    child.onNext(0L);
                } catch (Throwable t) {
                    child.onError(t);
                    return;
                }
                child.onCompleted();
            }
        }, time, unit);
    }   
}

Worker的实现

Worker是一个实现了Subscription的abstract static class,它的实现类主要有:

  • EventLoopWorker in CachedThreadScheduler (rx.schedulers)
  • EventLoopWorker in EventLoopsScheduler (rx.internal.schedulers
  • ExecutorSchedulerWorker in ExecutorScheduler (rx.schedulers)
  • HandlerWorker in HandlerScheduler (rx.android.schedulers)
  • InnerCurrentThreadScheduler in TrampolineScheduler (rx.schedulers)
  • InnerImmediateScheduler in ImmediateScheduler (rx.schedulers)
  • InnerTestScheduler in TestScheduler (rx.schedulers)
  • NewThreadWorker (rx.internal.schedulers)
  • PoolWorker in EventLoopsScheduler (rx.internal.schedulers)
  • ThreadWorker in CachedThreadScheduler (rx.schedulers)

它通过 schedule(Action0 action) 方法分发一个操作,这个操作将在指定时间后执行。

HandlerWorker

下面我们来看一下HandlerWorker,这个很熟悉吧,就是使用Android的Handler去实现事件的延迟的。它是rx.android.schedulers.HandlerScheduler的内部类。

static class HandlerWorker extends Worker {

        private final Handler handler;

        private final CompositeSubscription compositeSubscription = new CompositeSubscription();

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public void unsubscribe() {
            compositeSubscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return compositeSubscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (compositeSubscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

            final ScheduledAction scheduledAction = new ScheduledAction(action);//封装操作类,里面执行run方法,并调用action的call方法
            scheduledAction.addParent(compositeSubscription);
            compositeSubscription.add(scheduledAction);

            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));//实现延迟

            scheduledAction.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    handler.removeCallbacks(scheduledAction);
                }
            }));

            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
    }

EventLoopWorker

我们再来看一个EventLoopWorker,它是基于Java的ScheduledExecutorService来实现的,ScheduledExecutorService提供了按时间安排执行任务的功能。它是rx.internal.schedulers.EventLoopsScheduler的内部类。

private static class EventLoopWorker extends Scheduler.Worker {
        private final SubscriptionList serial = new SubscriptionList();
        private final CompositeSubscription timed = new CompositeSubscription();
        private final SubscriptionList both = new SubscriptionList(serial, timed);
        private final PoolWorker poolWorker;

        EventLoopWorker(PoolWorker poolWorker) {
            this.poolWorker = poolWorker;

        }

        @Override
        public void unsubscribe() {
            both.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return both.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action) {
            if (isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }
            ScheduledAction s = poolWorker.scheduleActual(action, 0, null, serial);

            return s;
        }
        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }
            ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed);

            return s;
        }
    }

关键是PoolWorker,它继承自NewThreadWorker,而NewThreadWorker内部就是使用ScheduledExecutorService来延迟事件的处理。

 private static final class PoolWorker extends NewThreadWorker {
        PoolWorker(ThreadFactory threadFactory) {
            super(threadFactory);
        }
    }

poolWorker.scheduleActual有具体实现是在rx.internal.schedulers.NewThreadWorker里面的:

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction, parent);
        parent.add(run);

        Future<?> f;//通过它拿到结果,使用它可以知道run任务的状态和得到run返回的执行结果
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

timer区别

timer与interval、range都有相近之处,也有一定区别。它们的主要作用如下:

  • Observable.timer(5, TimeUnit.SECONDS) 5秒后发射数据(操作),相当于一个延迟事件。
  • Observable.interval(3, TimeUnit.SECONDS) 每隔3秒发射一次,相当于一个重复操作。
  • Observable.range(1, 10) 从1开始发射10个数,相当于一个指定范围的操作。

timer实例

通过上面的介绍,我们基本可以知道timer能做什么和是怎么工作的。下面笔者将用它来替换Android中的Handler作为结尾的例子。类名暂且叫RxHandler。

public class RxHandler {

    private static ConcurrentHashMap<Object, Subscription> mRunnableMap = new ConcurrentHashMap<>();

    /**
     * 发送一个空msg
     * @param what
     * @return
     */
    public static Observable<Object> sendEmptyMessage(final int what){
        return Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                if(!subscriber.isUnsubscribed()){
                    subscriber.onNext(what);
                    subscriber.onCompleted();
                }
            }
        });
    }

    /**
     * 发送一个msg
     * @param msg
     * @return
     */
    public static Observable<Object> sendMessage(final Message msg){
        return Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                if(!subscriber.isUnsubscribed()){
                    subscriber.onNext(msg);
                    subscriber.onCompleted();
                }
            }
        });
    }

    /**
     * @param runnable
     * @param mainThread 是否在主线程调用runnable
     */
    public static Subscription post(final Runnable runnable, boolean mainThread) {
        return postDelayed(runnable, 0, mainThread);
    }

    /**
     * 延迟事件
     *
     * @param runnable
     * @param delay      延迟时间 mill
     * @param mainThread 是否在主线程调用runnable
     */
    public static Subscription postDelayed(final Runnable runnable, long delay, boolean mainThread) {
        if (mainThread) {
            Subscription s = Observable.timer(delay, TimeUnit.MILLISECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            if (runnable != null) {
                                runnable.run();
                                mRunnableMap.remove(runnable);
                            }
                        }
                    });
            mRunnableMap.put(runnable, s);
            return s;
        } else {
            Subscription s = Observable.timer(delay, TimeUnit.MILLISECONDS)
                    .observeOn(Schedulers.computation())
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            if (runnable != null) {
                                runnable.run();
                                mRunnableMap.remove(runnable);
                            }
                        }
                    });
            mRunnableMap.put(runnable, s);
            return s;
        }
    }

    public static void removeCallbacks(Runnable runnable) {
        Subscription s = mRunnableMap.remove(runnable);
        if (s != null && !s.isUnsubscribed()) {
            s.unsubscribe();
        }
    }
}

RxJava实现的事件总线分发可以参考笔者的另一篇文章:基于RxJava的事件总线–优雅的替换Handler

文章目录
  1. 1. timer的创建
  2. 2. timer延迟的实现
    1. 2.1. Worker的实现
      1. 2.1.1. HandlerWorker
      2. 2.1.2. EventLoopWorker
  3. 3. timer区别
  4. 4. timer实例
|