Introduction

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

现在可以看到很多开源库是以 ReactiveX 开头的, 一般简写为 Rx, 比如 RxJavaRxAndroid 等等。RxJava 应该算是里面应用较为广泛且完善程度较高的。

Rx 库的特点是:观察者模式、异步和流式结构。

RxJava 官方仓库

Overview

先对 RxJava 的机制做一个粗略的概括,方便下面逐步展开。

把 RxJava 的调用链看成一个句子,Observable 相当于主语Observer 相当于宾语,而 subscribe() 方法相当于谓语,把主语和宾语连接起来。当然在这个连接作用之前、之后和中间都可以做一些其他的处理,例如 map 操作符等,不过在这里先把这三者的关系理清即可。

Observable

Observable被观察者,是 RxJava 机制中的事件的触发源。

新建一个 Observable 有以下几种方式:

create

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Android");
        e.onNext("iOS");
        e.onComplete();
    }
});

一个 ObservableOnSubscribe<T> (接口)作为参数传入,在其 subscribe 方法中,依次调用了两次 onNext() 和一次 onComplete() 方法。其实这里的 subscribe 方法相当于定义一个计划表,规定了在订阅之后 Observer 要干些什么以及干的顺序是怎么样的。

just

Observable<String> observable1 = Observable.just("Android", "iOS");

效果跟上一个例子完全一样。

fromXXX

String[] systems = {"Android", "iOS"};
Observable<String> observable2 = Observable.fromArray(systems);

效果与上面两个例子一样。

其实只要泛型指定正确,可以用任何类型的 Array,还可以 fromIterable() 等。

Observer

Observer观察者,一般是事件流的终点。

可以以以下方式新建一个 Observer(Observer是一个接口):

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("OBSERVER = ON SUBSCRIBE");
    }

    @Override
    public void onNext(String s) {
        System.out.println("OBSERVER = " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        System.out.println("OBSERVER = ON COMPLETE");
    }
};

对于 Observer 而言,是按 onSubscribe->onNext->onComplete/onError 的顺序执行的(onComplete 和 onError 只会执行其中之一)。在 onSubscribe 方法中,参数 Disposable 运行随时打断这个执行链。

subscribe

上面说到,通过 Observable.subscribe(observer) 方法即可把观察者和被观察者关系起来,即观察者去订阅被观察者。

乍一看,这个语句似乎变成了被观察者去订阅观察者了,确实如此,但这是为了保证 Observable 的链式调用。

这样写的话我们可以:

Observable.create(xxx)
	.map()
	...
	.subscribe(observer);

而不是:

Observable observable = Observable.create(xxx)
				.map()
				...;
observer.subscribe(observable);

另外,RxJava 允许订阅非完整的 Observer。比如,你只关注 onNext 中做了什么,而不需要 onComplete/onError 等方法,这个时候这个特点就有用处了,不用去定义一个完整的 Observer。

Observable.subscribe() 允许的参数类型如下:

Observable.subscribe();
Observable.subscribe(Consumer<T> onNext);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Action onComplete);

需要对 Consumer 和 Action 作一下解释。

它们都是 RxJava 定义的接口,其中前者有一个回调方法 accept(T param),后者有一个回调方法 run(),区别就在于参数的个数不同。Consumer 有 1 个参数,Action 没有参数,还有一个 BiConsumer 有 2 个参数。

注意,由于 RxJava2.0 对 API 作了修改,Consumer 对应 1.0 的 Action1,Action 对应 Actiion0,BiConsumer 对应 Action2。

下一篇将介绍 RxJava 的变换操作符。

参考文献:给 Android 开发者的 RxJava 详解