基于RxJava的事件总线--优雅的替换Handler

以前我们一般是基于Android的Handler去实现一个消息队列的分发管理,但不方便点就在于UI线程和非UI线程间的切换。事件总线一般不在主线程运行,等事件分发完后,接收方如果想回到主线程,需要自己单独处理。 相对来说,要维护handler队列也是比较繁琐的一件事件。

传统方式

下面的这个类就是使用Android传统的handler去管理分发消息。Handler通过与Looper进行沟通,所在想要在非主线程分发消息,就需要创建一个HandlerThread。

public class EventCore {
    private static EventCore sInstance;
    private String mTag = "EventCore";
    private Context mContext;
    private HandlerThread mEventThread;
    private Handler mHandler;
    private HashMap<Integer, Event> mEventMap;
    private HashMap<Integer, ArrayList<IListener>> mListenerMap;
    private ExecutorService mExecutor;
    private boolean mRunning;

    private final int WHAT_QUIT = 0x01;
    private final int WHAT_REGISTER_LISTENER = WHAT_QUIT + 1;
    private final int WHAT_UNREGISTER_LISTENER = WHAT_REGISTER_LISTENER + 1;
    private final int WHAT_SEND = 0x01 << 24;

    EventCore(Context context) {
        mContext = context;
        mRunning = false;
        mNetService = new NetService(this);
        mDeviceService = new DeviceInfoService(this);
    }

    /**
     * 启动EventCore,只有启动之后才能投递事件与注册监听器
     */
    public void start() {
        if(!mRunning) {
            mEventThread = new HandlerThread(mTag);
            mEventThread.start();
            mHandler = new Handler(mEventThread.getLooper()) {
                @Override
                public void handleMessage(Message msg) {
                    switch (msg.what) {
                        case WHAT_QUIT:
                            mHandler.removeCallbacksAndMessages(null);
                            mEventThread.quit();
                            break;
                        case WHAT_REGISTER_LISTENER: {
                            ArrayList<IListener> listeners = mListenerMap.get(msg.arg1);
                            if(listeners == null) {
                                listeners = new ArrayList<>();
                                mListenerMap.put(msg.arg1, listeners);
                            }
                            if(!listeners.contains(msg.obj)) {
                                listeners.add((IListener)msg.obj);
                                if(msg.arg2 == 1 && mEventMap.containsKey(msg.arg1)) {
                                    ((IListener)(msg.obj)).onEvent(mEventMap.get(msg.arg1));
                                }
                            }
                            break;
                        }
                        case WHAT_UNREGISTER_LISTENER: {
                            ArrayList<IListener> listeners = mListenerMap.get(msg.arg1);
                            if(listeners != null) {
                                listeners.remove(msg.obj);
                            }
                            break;
                        }
                        default:
                            if(msg.what >= WHAT_SEND &&
                                    msg.obj instanceof Event) {
                                Event event = (Event)msg.obj;
                                mEventMap.put(event.getId(), event);
                                ArrayList<IListener> listeners = mListenerMap.get(event.getId());
                                if(listeners != null) {
                                    for(IListener listener : listeners) {
                                        listener.onEvent(event);
                                    }
                                }
                            }
                            break;
                    }
                }
            };
            mEventMap = new HashMap<>();
            mListenerMap = new HashMap<>();
            mExecutor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100, true));
            mRunning = true;
        }
    }

    /**
     * 停止EventCore
     */
    public void stop() {
        if(mRunning) {
            mRunning = false;
            mExecutor.shutdown();
            mHandler.sendEmptyMessage(WHAT_QUIT);
        }
    }

    public Context getContext() {
        return mContext;
    }

    public ExecutorService getExecutor() {
        return mExecutor;
    }

    /**
     * 注册监听器
     * @param id 该监听器关心的事件id
     * @param listener 监听器
     * @param notifyNow 如果存在该事件缓存,是否立马通知该监听器
     * @return 为链式编程方便而返回该实例
     */
    public EventCore registerListener(int id, IListener listener, boolean notifyNow) {
        Message message = mHandler.obtainMessage(WHAT_REGISTER_LISTENER, id, notifyNow ? 1 : 0, listener);
        message.sendToTarget();
        return this;
    }

    /**
     * 注销事件监听器
     * @param id 该监听器关心的事件id
     * @param listener 监听器
     * @return 为链式编程方便而返回该实例
     */
    public EventCore unregisterListener(int id, IListener listener) {
        Message message = mHandler.obtainMessage(WHAT_UNREGISTER_LISTENER, id, 0, listener);
        message.sendToTarget();
        return this;
    }

    public Looper getLooper() {
        return mEventThread.getLooper();
    }

    /**
     * 获取事件缓存
     * @param id 事件id
     * @return 若事件存在则返回该事件,否则返回null
     */
    public Event getEvent(int id) {
        return mEventMap.get(id);
    }

    /**
     * 投递一个事件到EventCore中,有EventCore分发该事件
     * @param event 待分发的事件
     * @return true 投递成功,false 投递失败
     */
    public boolean post(Event event) {
        if(mRunning) {
            Message message = mHandler.obtainMessage(WHAT_SEND + event.getId(), event);
            if(mHandler.hasMessages(message.what)) {
                mHandler.removeMessages(message.what);
            }
            mHandler.sendMessageDelayed(message, event.getDelay());
            return true;
        }
        return false;
    }

    /**
     * 异步创建一个事件,然后将该事件投递到EventCore中分发
     * @param creator 事件创建器
     * @return true 投递成功,false 投递失败
     */
    public boolean post(final Event.Creator creator) {
        if(mRunning) {
            mExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    Event event = new Event(creator.getId());
                    try {
                        event.setData(creator.getData());
                        event.setDelay(creator.getDelay());
                    } catch (Throwable t) {
                        event.setError(t);
                    }
                    post(event);
                }
            });
            return true;
        }
        return false;
    }

    /**
     * 创建实例,对于同一个进程只有第一次创建有效,重复创建没有任何反应
     * @param context app上下文
     * @return 返回EventCore实例
     */
    public synchronized static EventCore createInstance(Context context) {
        if(sInstance == null) {
            sInstance = new EventCore(context.getApplicationContext());
        }
        return sInstance;
    }

    /**
     * 获取实例,若在此之前未创建实例,则返null,严格限制app对EventCore创建、销毁的逻辑
     * @return EventCore实例
     */
    public synchronized static EventCore getInstance() {
        return sInstance;
    }
}

RxJava方式

在消息分发、线程切换上,RxJava的便利性就充分体现出了,我们只需要少量的代码就可以优雅的实现,省去了handler、AsynTask和接口回调等繁琐操作。

public class RxBus {
    private static final String TAG = AccountBus.class.getSimpleName();
    private static volatile AccountBus mInstance;

    public static AccountBus getInstance() {
        if (null == mInstance) {
            synchronized (AccountBus.class) {
                if(mInstance == null) {
                    mInstance = new AccountBus();
                }
            }
        }
        return mInstance;
    }

    private AccountBus() {
    }

    private ConcurrentHashMap<Object, Subject> mSubjectMapper = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Object, Subscription> mDelaySubscriptionMapper = new ConcurrentHashMap<>();

    @SuppressWarnings("unchecked")
    public <T> Observable<T> register(Object tag) {
        Subject subject= mSubjectMapper.get(tag);
        if (null == subject) {
            subject = PublishSubject.create();
        }
        mSubjectMapper.put(tag, subject);
        Log.d(TAG, "[register]mSubjectMapper: " + mSubjectMapper);
        return subject;
    }

    public void unregister(Object tag) {
        Subscription sub = mDelaySubscriptionMapper.remove(tag);
        if(sub != null){
            sub.unsubscribe();
        }
        mSubjectMapper.remove(tag);
        Log.d(TAG, "[unregister]mSubjectMapper: " + mSubjectMapper);
    }

    @SuppressWarnings("unchecked")
    public void post(Object tag, Object content) {
        Subject subject = mSubjectMapper.get(tag);
        if (subject != null) {
            Map<Object, Object> map = new HashMap<>();
            map.put(tag, content);//将tag也一起返回,这样方便使用者过虑信息,有时只需要关注是否是特定tag的事件
            subject.onNext(map);
        }
        Log.d(TAG, "[send]mSubjectMapper: " + mSubjectMapper);
    }

    /**
     *
     * @param tag
     * @param content
     * @param delayTime millseconds
     */
    @SuppressWarnings("unchecked")
    public void post(final Object tag, final Object content, long delayTime) {
        if(delayTime > 0) {//实现delay效果
            Subscription sub = Observable.timer(delayTime, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.immediate())
                    .subscribe(new Action1<Object>(){

                        @Override
                        public void call(Object o) {
                            Subject subject = mSubjectMapper.get(tag);
                            if(subject != null) {
                                Map<Object, Object> map = new HashMap<>();
                                map.put(tag, content);
                                subject.onNext(map);
                            }
                        }
                    });
            mDelaySubscriptionMapper.put(tag, sub);
        } else {
            Subject subject = mSubjectMapper.get(tag);
            if (subject != null) {
                Map<Object, Object> map = new HashMap<>();
                map.put(tag, content);
                subject.onNext(map);
            }
        }
        Log.d(TAG, "[send]mSubjectMapper: " + mSubjectMapper);
    }
}

我们可以通过下面的方法来注册监听

    /**
     * 发起异步请求
     * @param tag 请求标志
     * @param content 请求参数
     * @param delayTime 延迟发起秒数
     */
    protected void postRequest(String tag, Object content, long delayTime) {
        RxBus.getInstance().register(tag).subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(Object o) {
                handleRequest();
            }
        });
    }

然后在适当的位置调用发送一个事件

RxBus.getInstance().post(tag, content, delayTime);
文章目录
  1. 1. 传统方式
  2. 2. RxJava方式
|