Kotlin 遇到 RxJava

RxJava 代码简洁易读,它内部支持多线程操作,还提供丰富的操作符。比如强大的map和flatmap操作符,可以让我们依赖上一次接口数据的业务进行二次处理时不会发生嵌套。再加上Java1.8和第三方框架支持Lambda流式,会使RxJava的代码写起来更加简洁。下面将配合Kotlin,将RxJava代码写得更加简单、优雅。

创建 Obsevable

RxJava2 的写法:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
            }
        });

结合 Kotlin 的写法:

Observable.create(ObservableOnSubscribe<String>
            { emitter -> emitter.onNext(fetchFromServer()) });

线程切换

RxJava2 的写法:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String value) {
                        Log.d(TAG, "result = " + value);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {

                    }
                });

结合 Kotlin 的写法:

Observable.create(ObservableOnSubscribe<String>
                { emitter -> emitter.onNext("hello world") })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe{ result ->  Log.d(TAG, "result = " + result) }

错误处理

RxJava2 的写法:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                try{
                    emitter.onNext("hello world");
                    emitter.onComplete();
                } catch (Exception e){
                    emitter.onError(e);
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String value) {
                        Log.d(TAG, "result = " + value);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {
                        Log.d(TAG, "error = " + throwable.getMessage());
                    }
                });

结合 Kotlin 的写法:

Observable.create(ObservableOnSubscribe<String>
        { emitter ->
            try {
                emitter.onNext("hello world")
                emitter.onComplete()
            } catch (error: Exception) {
                emitter.onError(error)
            }
        })
                .subscribe(
                        { result -> Log.d(TAG, "result = " + result) },
                        { error -> Log.e(TAG, "{$error.message}") },
                        { Log.d(TAG, "completed") }
                )

操作符

map

RxJava2 的写法:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

结合 Kotlin 的写法:

Observable
                .just("apple", "orange", "mango")
                .map { fruit -> "eats $fruit" }
                .subscribe{ dilicous -> Log.d(TAG, "Ethan $dilicous") }

flatmap

RxJava2 的写法:

String url = "https://www.baidu.com";
        Observable.just(url)
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<String, Observable<Response>>() {

                    @Override
                    public Observable<Response> apply(@NonNull String url) throws Exception {
                        //使用okhttp3访问网络
                        Request.Builder builder = new Request.Builder();
                        Request request = builder.url(url).get().build();
                        OkHttpClient client = new OkHttpClient.Builder().build();
                        Response response = client.newCall(request).execute();
                        return Observable.from(response);
                    }
                })
                .doOnNext(new Consumer<Response>() {
                    @Override
                    public void accept(Response response) throws Exception {
                        //保存数据库
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Response>() {
                    @Override
                    public void accept(Response response) throws Exception {
                        ResponseBody responseBody = response.body();
                        String result = responseBody.toString();
                    }
                });

结合 Kotlin 的写法:

val url = "https://www.baidu.com"
Observable.just(url)
        .flatMap({ value ->
            //使用okhttp3访问网络
            val builder = Request.Builder()
            val request = builder.url(value).get().build()
            val client = OkHttpClient.Builder().build()
            val response = client.newCall(request).execute()
            val responseBody = response.body()
            val result = responseBody?.string()
            return@flatMap Observable.from(result)
        })
        .subscribe({ result ->
            Log.d(TAG, result)
        }, { error ->
            Toast.makeText(this, error.message, Toast.LENGTH_SHORT).show()
        })

merge

RxJava2 的写法:

final String[] aStrings = {"A1", "A2", "A3", "A4"};
        final String[] bStrings = {"B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        Observable.merge(aObservable, bObservable)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "result = " + s);
                    }
                });

结合 Kotlin 的写法:

val aStrings = arrayOf("A1", "A2", "A3", "A4")
        val bStrings = arrayOf("B1", "B2", "B3")

        val aObservable = Observable.fromArray(*aStrings)
        val bObservable = Observable.fromArray(*bStrings)

        Observable.merge(aObservable, bObservable)
                .subscribe{value -> Log.d(TAG, "result = " + value)}

zip

RxJava2 的写法:

private Observable<String> getStrObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    e.onNext("B");
                    e.onNext("C");
                }
            }
        });
    }

    private Observable<Integer> getIntObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onNext(4);
                    e.onNext(5);
                }
            }
        });
    }

    private void test() {
        Observable.zip(getStrObservable(), getIntObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String result) throws Exception {
                Log.d(TAG, "result " + result + "\n");
            }
        });
    }

结合 Kotlin 的写法:

private fun getStrObservable(): Observable<String> {
        return Observable.create { e ->
            if (!e.isDisposed) {
                e.onNext("A")
                e.onNext("B")
                e.onNext("C")
            }
        }
    }

    private fun getIntObservable(): Observable<Int> {
        return Observable.create { e ->
            if (!e.isDisposed) {
                e.onNext(1)
                e.onNext(2)
                e.onNext(3)
                e.onNext(4)
                e.onNext(5)
            }
        }
    }

    fun test() {
        Observable.zip(getStrObservable(), getIntObservable(), BiFunction<String, Int, String> {
                strValue, intValue -> strValue + intValue
        }).subscribe { result -> Log.d(TAG, "result " + result + "\n") }
    }

zipWith

RxJava2 的写法:

private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("Test");
                }
            }
        });
    }

    private class SomeClass {
        private String result;
        private int first, second;

        public SomeClass(int a, int b) {
            first = a;
            second = b;
        }

        public void setResult(String value) {
            result = value + (first + second);
        }

        public String getResult(){
            return result;
        }
    }

    List<SomeClass> list = new ArrayList<>();
            list.add(new SomeClass(1, 2));
            list.add(new SomeClass(3, 4));
            Observable
                    .fromIterable(list)
                    .zipWith(getStringObservable(), new BiFunction<SomeClass, String, SomeClass>() {
                        @Override
                        public SomeClass apply(@NonNull SomeClass someClass, @NonNull String s) throws Exception {
                            someClass.setResult(s);
                            return someClass;
                        }
                    })
                    .subscribe(new Consumer<SomeClass>() {
                        @Override
                        public void accept(SomeClass result) throws Exception {
                            Log.d(TAG, "result = " + result.getResult());
                        }
                    });

结合 Kotlin 的写法:

private fun getStringObservable(): Observable<String> {
        return Observable.create { e ->
            if (!e.isDisposed) {
                e.onNext("Test")
            }
        }
    }

    private inner class SomeClass(private val first: Int, private val second: Int) {
        var result: String? = null
            set(value) {
                field = value + (first + second)
            }
            get()=field
    }

    val list = listOf(SomeClass(1, 2), SomeClass(3, 4))
        Observable
                .fromIterable(list)
                .zipWith(getStringObservable(), BiFunction<SomeClass, String, SomeClass> { someClass, s ->
                    someClass.result = s
                    someClass
                })
                .subscribe { result -> Log.d(TAG, "result = " + result.result) }

concat

RxJava2 的写法:

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d(TAG, "concat : "+ integer + "\n" );
                    }
                });

结合 Kotlin 的写法:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
                .subscribe{result -> Log.d(TAG, "result = " + result)}

filter

RxJava2 的写法:

Observable.just(1, 2, 3, 4, 5)
               .filter(new Predicate<Integer>() {
                   @Override
                   public boolean test(@NonNull Integer integer) throws Exception {
                       return integer % 2 == 0;
                   }
               })
               .subscribe(new Consumer<Integer>() {
                   @Override
                   public void accept(Integer integer) throws Exception {
                       Log.d(TAG, "result = " + integer);
                   }
               });

结合 Kotlin 的写法:

Observable.just(1, 2, 3, 4, 5)
                .filter { value -> return@filter value % 2 == 0 }
                .subscribe({ t -> Log.d(TAG, "result " + t) })

大家可以使用RxKotlin,它是规范了RxJava与Kotlin使用的惯例。

文章目录
  1. 1. 创建 Obsevable
  2. 2. 线程切换
  3. 3. 错误处理
  4. 4. 操作符
    1. 4.1. map
    2. 4.2. flatmap
    3. 4.3. merge
    4. 4.4. zip
    5. 4.5. zipWith
    6. 4.6. concat
    7. 4.7. filter
|