对RxJava中.repeatWhen()和.retryWhen()操作符的理解

笔者最近在使用RxJava开发项目,使用到了repeatWhen和retryWhen操作符,刚接触的时候确实是有不少困惑的,单从字面意思来看,它俩都有重复或重试的意思,但其实是有一些区别的,本文就在使用中的理解做个总结,如有错漏,不吝赐教。

先来一张大大Daniel Lew原文的图:

从上图可以看出,retryWhen发生在抛出异常即上图红色的e的时候,当进入retryWhen内部过程的时候,可以设置一定时间的delay,也可以不设置直接返回。它的返回类型要求是Observable的,因为这个返回是返回到链式结构里面,这样就可以不打破链式调用。

例如下面的:

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
	@Override
	public Observable<?> call(Observable<? extends Throwable> observable) {
		return Observable.timer(delayTime, TimeUnit.SECONDS);
	}
);

retryWhen是在抛出异常的情况下调用的,上面的代码也告诉了我们,在retryWhen里面是将throwable传进去了了。那应该怎么取这个异常呢?我们可以通过flatmap做个转换。

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        ......//在这处我们可以针对不同的异常做一些处理,比如中断重试
                        return Observable.error(throwable);
                    }
                });
            }
        });

repeatWhen也是进行重试,但两者发生的时机是有区别,repeatWhen是指在指定时间间隔内,不断重复发起请求,直到有打断此重复请求的事情发生为止。来看一下代码:

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Observable<? extends Void> observable) {
                        return observable.delay(delayTime, TimeUnit.SECONDS);
                    }
                })

两者不同点

进入方法时的传参不同

通过上面的代码对比我们知道,在进入repeatWhen的时候,并没有将异常throwable传进去,它是一个Void型的传参,而retryWhen是一个Throwable型的传参。 repeatWhen比较适合实现轮循操作,observable不断按照delaytime的间隔进行重复调用,直到有打断此调用为止才调用onNext()或onError()。

那有几种方法可以打断此重试呢?有很多种,比如与range()操作符结合使用,达到指定范围的上限就会中断,又比如抛出异常,又比如与filter、takeUntil等操作符结合使用。下面举例两个与range、filter和takeUntil使用的例子。

//将执行两次,每次输入0到3,每次间隔2秒钟
Observable.range(0, 3).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {  
          @Override  
          public Observable<?> call(Observable<? extends Void> observable) {  
              return Observable.timer(2, TimeUnit.SECONDS);  
          }  
      }).subscribe(new Observer<Integer>() {  
          @Override  
          public void onCompleted() {  
          }  

          @Override  
          public void onError(Throwable e) {  
          }  

          @Override  
          public void onNext(Integer integer) {    
			Log.d(TAG, "integer = " +integeer);
          }  
      });  

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Observable<? extends Void> observable) {
                        return observable.delay(DELAY_TIME, TimeUnit.SECONDS);
                    }
                })
                .takeUntil(new Func1<HttpResponse, Boolean>() {
                    @Override
                    public Boolean call(HttpResponse response) {
			//直到返回true
                        return true;
                    }
                })
                .filter(new Func1<HttpResponse, Boolean>() {//首次启动的时候,它会先于takeUntil调用
                    @Override
                    public Boolean call(HttpResponse response) {
			//只有返回true才会回调onnext
                        return true;
                    }
                })

两者回调不同

repeatWhen会在结束的时候回调onComplete,如果有异常则会回调onError。而retryWhen则不会回调onComplete,我们也可以看到,它是一个带throwable类型的,而onComplete是void型的。

轮循实例

在实际使用过程中,我们可以将它们组合来使用,一个负责在请求发生异常时进行重试,一个负责处理需要轮循的情况,比如轮循10次。下面我将以手机扫码后,轮循服务器的登录态为例来说明。

我们先建一个负责处理异常的重试类:

/**
 * 网络请求重试器
 */
public class RetryFactory implements Func1<Observable<? extends Throwable>, Observable<?>> {

    /**
     * 重试间隔时间
     */
    private long mInterval = 1;
    /**
     * 重试次数
     */
    private int mMaxRetryTime = 2;
    /**
     * 是否每次重试的时间间隔递增
     */
    private boolean mIsIncrease;

    private int mCurRetryCount;

    /**
     * 重试工厂
     * @param interval 每次重试间隔, 以秒为单位
     * @param isProgressiveIncrease 是否每次重试的时间间隔递增,递增规律为:interval当前执行到的重试次数的次方
     * @param retryTime 重试次数
     */
    public RetryFactory(long interval, boolean isProgressiveIncrease, int retryTime) {
        mInterval = interval;
        mMaxRetryTime = retryTime;
        mIsIncrease = isProgressiveIncrease;
    }


    /**
     * 对网络无连接的情况不进行重试,并且重试有超时机制与重试间隔。
     * @param observable
     * @return
     */
    @Override
    public Observable<?> call(final Observable<? extends Throwable> observable) {
        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                if (throwable instanceof NetworkErrorException) {//没有网络、网络可连不可用、网络被劫持等直接抛异常,不进行重试
                    if(((NetworkErrorException) throwable).getStatusCode() == CODE_NETWORK_NOT_AVAILABLE
                            || ((NetworkErrorException) throwable).getStatusCode() == CODE_UNKOWN_HOST)
                    return Observable.error(throwable);
                }
                //重试mMaxRetryTime次
                if (++mCurRetryCount <= mMaxRetryTime) {
                    long delayTime = mInterval;
                    if(mIsIncrease){
                        delayTime = (long) Math.pow(mInterval, mCurRetryCount);
                    }
                    return Observable.timer(delayTime, TimeUnit.SECONDS);
                }
                return Observable.error(throwable);
            }
        });
    }
}

再建一个管理重试次数,即轮循的类:

/**
 * 重复发射器,通过自定义Transformer,将源Observable按照自定义的方式转化成另外一个新的Observable,
 * 并用Compose组合到使用此工厂的Obervable中
 */
public class RepeatFactory implements Func1<Observable<? extends Void>, Observable<?>> {

    private final String TAG = RepeatFactory.class.getSimpleName();

    /**
     * 重试间隔时间, 单位秒
     */
    private long mInterval = 1;
    /**
     * 重试次数
     */
    private int mMaxRetryTime = 2;

    private long mCurInterval;
    /**
     * 重试间隔时间增加阀值,不能大小 mMaxRetryTime
     */
    private int mIncreaseValve;

    /**
     * @param interval 每次重试间隔时间
     * @param retryTime 重试总次数
     * @param increaseValve 重试间隔时间增加阀值,不能大小 retryTime
     */
    public RepeatFactory(long interval, int retryTime, int increaseValve) {
        if(interval < 0) {
            mInterval = 0;
            mCurInterval = 0;
        } else {
            mInterval = interval;
            mCurInterval = interval;
        }
        if(retryTime <= 0) {
            mMaxRetryTime = 1;
        } else {
            mMaxRetryTime = retryTime;
        }
        if(increaseValve > mMaxRetryTime){
            mIncreaseValve = mMaxRetryTime;
        } else {
            mIncreaseValve = increaseValve;
        }
    }

    @Override
    public Observable<?> call(Observable<? extends Void> observable) {
        return observable.compose(zipWithFlatMap());
    }

    private <T> Observable.Transformer<T, Long> zipWithFlatMap() {
        return new Observable.Transformer<T, Long>() {

            @Override
            public Observable<Long> call(Observable<T> observable) {
			//重试次数
                return observable.zipWith(Observable.range(1, mMaxRetryTime), new Func2<T, Integer, Integer>() {
                    @Override
                    public Integer call(T t, Integer repeatAttempt) {
                        return repeatAttempt;
                    }
                }).flatMap(new Func1<Integer, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(Integer repeatAttempt) {
                        //增加等待时间
                        long interal = mInterval;
                        int curCount = repeatAttempt;
                        if (++curCount > mIncreaseValve) {
                            mCurInterval = mInterval + repeatAttempt >> 4;
                            interal = mCurInterval;
                        }
                        return Observable.timer(interal, TimeUnit.SECONDS);
                    }
                });
            }
        };
    }
}

它们两个组合起来就可以实现对特殊异常进行重试,网络异常直接结束,按指定次数进行轮循等。

mHttpManager.postAsync(URLConstants.LOGIN_STATE_URL, params, headerData, false)
                .retryWhen(new RetryFactory(mRetryInternal, mMaxRetryTime, false))//发生异常时进行重试
                .repeatWhen(new RepeatFactory(mRepeatInteral, mMaxRepeatTime, mIncreaseValve))//轮循次数控制
                .takeUntil(new Func1<HttpResponse, Boolean>() {//Thread: RxIoScheduler
                    @Override
                    public Boolean call(HttpResponse response) {//直到返回true
                        return isJobDone(response, true);
                    }
                })
                .filter(new Func1<HttpResponse, Boolean>() {//Thread: RxIoScheduler
                    @Override
                    public Boolean call(HttpResponse response) {//只有返回true才会回调onnext, 先回调
                        return isJobDone(response, true);
                    }
                })
                .subscribeOn(Schedulers.io())
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        //订阅前可以先检查网络可用性,不可用就不要发起订阅了,直接抛异常退出
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<HttpResponse>() {
                    @Override
                    public void onCompleted() {//repeatwhen重复次数结束时只回调onCompleted
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(HttpResponse response) {
                    }
                });

不错的博文 给 Android 开发者的 RxJava 详解

文章目录
  1. 1. 两者不同点
    1. 1.1. 进入方法时的传参不同
    2. 1.2. 两者回调不同
  2. 2. 轮循实例
|