RxJava - 梗概
- coding
- android
- rxjava
Introduction
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
现在可以看到很多开源库是以 ReactiveX
开头的, 一般简写为 Rx
, 比如 RxJava
, RxAndroid
等等。RxJava
应该算是里面应用较为广泛且完善程度较高的。
Rx
库的特点是:观察者模式、异步和流式结构。
Overview
先对 RxJava 的机制做一个粗略的概括,方便下面逐步展开。
把 RxJava 的调用链看成一个句子,Observable
相当于主语,Observer
相当于宾语,而 subscribe()
方法相当于谓语,把主语和宾语连接起来。当然在这个连接作用之前、之后和中间都可以做一些其他的处理,例如 map
操作符等,不过在这里先把这三者的关系理清即可。
Observable
Observable
,被观察者,是 RxJava 机制中的事件的触发源。
新建一个 Observable 有以下几种方式:
create
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Android");
e.onNext("iOS");
e.onComplete();
}
});
一个 ObservableOnSubscribe<T>
(接口)作为参数传入,在其 subscribe
方法中,依次调用了两次 onNext()
和一次 onComplete()
方法。其实这里的 subscribe 方法相当于定义一个计划表,规定了在订阅之后 Observer
要干些什么以及干的顺序是怎么样的。
just
Observable<String> observable1 = Observable.just("Android", "iOS");
效果跟上一个例子完全一样。
fromXXX
String[] systems = {"Android", "iOS"};
Observable<String> observable2 = Observable.fromArray(systems);
效果与上面两个例子一样。
其实只要泛型指定正确,可以用任何类型的 Array,还可以 fromIterable()
等。
Observer
Observer
,观察者,一般是事件流的终点。
可以以以下方式新建一个 Observer(Observer是一个接口):
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("OBSERVER = ON SUBSCRIBE");
}
@Override
public void onNext(String s) {
System.out.println("OBSERVER = " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("OBSERVER = ON COMPLETE");
}
};
对于 Observer 而言,是按 onSubscribe->onNext->onComplete/onError
的顺序执行的(onComplete 和 onError 只会执行其中之一)。在 onSubscribe
方法中,参数 Disposable
运行随时打断这个执行链。
subscribe
上面说到,通过 Observable.subscribe(observer)
方法即可把观察者和被观察者关系起来,即观察者去订阅被观察者。
乍一看,这个语句似乎变成了被观察者去订阅观察者了,确实如此,但这是为了保证 Observable
的链式调用。
这样写的话我们可以:
Observable.create(xxx)
.map()
...
.subscribe(observer);
而不是:
Observable observable = Observable.create(xxx)
.map()
...;
observer.subscribe(observable);
另外,RxJava 允许订阅非完整的 Observer。比如,你只关注 onNext
中做了什么,而不需要 onComplete/onError
等方法,这个时候这个特点就有用处了,不用去定义一个完整的 Observer。
Observable.subscribe()
允许的参数类型如下:
Observable.subscribe();
Observable.subscribe(Consumer<T> onNext);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError);
Observable.subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Action onComplete);
需要对 Consumer 和 Action 作一下解释。
它们都是 RxJava 定义的接口,其中前者有一个回调方法 accept(T param)
,后者有一个回调方法 run()
,区别就在于参数的个数不同。Consumer 有 1 个参数,Action 没有参数,还有一个 BiConsumer 有 2 个参数。
注意,由于 RxJava2.0 对 API 作了修改,Consumer 对应 1.0 的 Action1,Action 对应 Actiion0,BiConsumer 对应 Action2。
下一篇将介绍 RxJava 的变换操作符。