RxJava - 梗概
- coding
- android
- rxjava
Introduction
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
现在可以看到很多开源库是以 ReactiveX 开头的, 一般简写为 Rx, 比如 RxJava, RxAndroid 等等。RxJava 应该算是里面应用较为广泛且完善程度较高的。
Rx 库的特点是:观察者模式、异步和流式结构。
Overview
先对 RxJava 的机制做一个粗略的概括,方便下面逐步展开。
把 RxJava 的调用链看成一个句子,Observable 相当于主语,Observer 相当于宾语,而 subscribe() 方法相当于谓语,把主语和宾语连接起来。当然在这个连接作用之前、之后和中间都可以做一些其他的处理,例如 map 操作符等,不过在这里先把这三者的关系理清即可。
Observable
Observable,被观察者,是 RxJava 机制中的事件的触发源。
新建一个 Observable 有以下几种方式:
create
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Android");
e.onNext("iOS");
e.onComplete();
}
});
一个 ObservableOnSubscribe<T> (接口)作为参数传入,在其 subscribe 方法中,依次调用了两次 onNext() 和一次 onComplete() 方法。其实这里的 subscribe 方法相当于定义一个计划表,规定了在订阅之后 Observer 要干些什么以及干的顺序是怎么样的。
just
Observable<String> observable1 = Observable.just("Android", "iOS");
效果跟上一个例子完全一样。
fromXXX
String[] systems = {"Android", "iOS"};
Observable<String> observable2 = Observable.fromArray(systems);
效果与上面两个例子一样。
其实只要泛型指定正确,可以用任何类型的 Array,还可以 fromIterable() 等。
Observer
Observer,观察者,一般是事件流的终点。
可以以以下方式新建一个 Observer(Observer是一个接口):
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("OBSERVER = ON SUBSCRIBE");
}
@Override
public void onNext(String s) {
System.out.println("OBSERVER = " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("OBSERVER = ON COMPLETE");
}
};
对于 Observer 而言,是按 onSubscribe->onNext->onComplete/onError 的顺序执行的(onComplete 和 onError 只会执行其中之一)。在 onSubscribe 方法中,参数 Disposable 运行随时打断这个执行链。
subscribe
上面说到,通过 Observable.subscribe(observer) 方法即可把观察者和被观察者关系起来,即观察者去订阅被观察者。
乍一看,这个语句似乎变成了被观察者去订阅观察者了,确实如此,但这是为了保证 Observable 的链式调用。
这样写的话我们可以:
Observable.create(xxx)
.map()
...
.subscribe(observer);
而不是:
Observable observable = Observable.create(xxx)
.map()
...;
observer.subscribe(observable);
另外,RxJava 允许订阅非完整的 Observer。比如,你只关注 onNext 中做了什么,而不需要 onComplete/onError 等方法,这个时候这个特点就有用处了,不用去定义一个完整的 Observer。
Observable.subscribe() 允许的参数类型如下:
Observable.subscribe();
Observable.subscribe(Consumer<T> onNext);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Action onComplete);
需要对 Consumer 和 Action 作一下解释。
它们都是 RxJava 定义的接口,其中前者有一个回调方法 accept(T param),后者有一个回调方法 run(),区别就在于参数的个数不同。Consumer 有 1 个参数,Action 没有参数,还有一个 BiConsumer 有 2 个参数。
注意,由于 RxJava2.0 对 API 作了修改,Consumer 对应 1.0 的 Action1,Action 对应 Actiion0,BiConsumer 对应 Action2。
下一篇将介绍 RxJava 的变换操作符。