基本用法
1 | "users/{username}/repos") ( |
1 | public class MainActivity extends AppCompatActivity { |
以 github 的 API为例,获取指定用户下所有的仓库
https://api.github.com/users/wy521angel/repos 代码如上所示,Single 和 Observable 都是被观察者、被订阅者,Single 只回调 onSuccess,而 Observable 还会回调 onNext 和 onComplete。RxJava 可以对事件流进行处理,即一个一个连续的事件,onNext 表示每一个新的事件,而 onComplete 表示事件序列处理结束了,一般来说网络请求只有一个事件,使用 Single 更加方便。Single 在 RxJava2 开始出现。
onSuccess、onError 运行的线程由 observeOn 指定,api.getRepos(“wy521angel”) 由 subscribeOn 指定。而 onSubscribe 不指定,即代码运行在哪个线程便在那个线程,相当于将该回调中的代码 textView.setText(“正在请求”); 等写在外面即 api.getRepos(“wy521angel”) 上面。
onSubscribe 回调内的 Disposable 表示丢弃,例如当 Activity 关闭时,不需要再进行网络请求,可以通过 dispose() 方法来让上游停止工作,让上游不再生产,达到“丢弃”的效果。
RxJava 对象的创建(通过 Single.just 举例)
just 拿到一个对象,在订阅的瞬间便发射出去,并不像网络请求,订阅开始之后执行网路请求,有等待延迟,just 可以发射一个或者多个对象。代码如下所示:
1 | Single.just("1").subscribe(new SingleObserver<String>() { |
上面代码没有涉及到网络操作,可以不切线程。在当前线程发射当前线程获取。上述代码可以拆成两行代码,如下所示:
1 | Single<String> single = Single.just("1"); |
第一行获取被观察者 Single 对象,第二行订阅。如果需要切换线程,可以写成如下代码:
1 | Single<String> single = Single.just("1"); |
RxJava 可以拆写成一行一行的形式,just 方法代码如下:
1 | public static <T> Single<T> just(final T item) { |
第一行判空,just 不允许传入 null,onAssembly 是一个钩子方法,内部的 apply 只是做了类型转换,关键是 SingleJust 对象,代码如下:
1 | public final class SingleJust<T> extends Single<T> { |
SingleJust 继承自 Single,subscribeActual
是订阅过程中实际执行逻辑代码的方法,subscribe 方法代码如下:
1 |
|
第一行判空,第二行钩子方法,第三行再次判空,subscribe 方法来自 SingleJust,因为之前是使用 Just 方法创建的对象,subscribeActual 就是看 SingleJust 中的实现:
1 | observer.onSubscribe(Disposables.disposed()); |
observer 就是订阅时传入的 new SingleObserver,可以看出 subscribeActual 依次调用了 SingleObserver 回调中的 onSubscribe、onSuccess 方法。这是最简单的一个创建事件序列的方法。Disposables.disposed() 由前面可知是“丢弃”,Just 在订阅的一瞬间就被丢弃了,不必要我们手动丢弃,因为事件序列只有一个事件并且不需要延迟,第二行直接返回 onSuccess,在订阅的一瞬间就可以很确定可以将结果返回,此处的返回值是代码中 String 类型的1。这是一个比较特殊的情况。SingleJust 没有 onError 发送,因为它是不可能失败的。subscribeActual 方法实现了观察者和被观察者的连接。
如下图所示,Single.just() 和 new SingleObserver() 分别创建了Single<String> 和 SingleObserver<String> 两个对象,调用 single.subscribe(observer) 后,Single<String> 向 SingleObserver<String> 发送 onSubscribe 和 onSuccess 两个事件,于是整个事件序列便形成了。
RxJava 的整体结构是一条链,其中:
- 链的最上游:生产者 Observable;
- 链的最下游:观察者 Observer;
- 链的中间:各个中介节点,既是下游的 Observable,又是上游的 Observer。
操作符是如何工作的(以 map 为例)
操作符对整个数据流或者数据流中具体的数据进行操作。
示例如下:
1 | Single.just(1).map(new Function<Integer, String>() { |
map 操作结构如下:
此处 single.subscribe(observer) 已经不是由 Single<Integer> 调用了,而是由 Single<String> 调用,所以在调用了 single.subscribe(observer) 后,是 Single<String> 向 SingleObserver<String> 发送事件,而不是 Single<Integer>,map 操作符中,新建了 SingleMap,SingleMap 的 源码如下:
1 | public final class SingleMap<T, R> extends Single<R> { |
此处 source 指的是在做 map 操作之前的源,本例中就是 Single.just(),mapper 指转换器,本例中指的是将 Integer 转换成 String 的方法。subscribeActual 中的 source.subscribe ,执行的就是 just 中的 subscribeActual 方法。map 在被订阅时,会让自己的上游被订阅,single.subscribe(observer) 由 Single<String> 调用,Single<String> 做的事是让 Single<Integer> 再调用一次,并将 MapSingleObserver 作为参数传入。MapSingleObserver 源码如下:
1 | static final class MapSingleObserver<T, R> implements SingleObserver<T> { |
可以看到,在 map 上游调用 map 的 onSubscribe 时,map 会调用自己下游的 onSubscribe,并且将上游的 Disposable 直接向下传;onSuccess 中的 mapper 在本例中就是 String.valueOf(integer),并且再调用下游的 onSuccess,onError 同理,map 就是一个做“转接”的人,把上游和下游做了连接,一般用来做数据转换。
操作符 Operator(map() 等等)基于原 Observable 创建一个新的 Observable,Observable 内部创建一个 Observer,通过定制 Observable 的 subscribeActual() 方法和 Observer 的 onXxx() 方法,来实现自己的中介角色(例如数据转换、线程切换)。
线程切换
之前在做线程切换时,请求在 io 线程,回调在主线程,代码如下:
1 | ... |
事实上可以设置 RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()),这样所有请求都在 io 线程,省去了每次做请求写 subscribeOn(Schedulers.io()) 代码。
subscribeOn()
原理:在 Scheduler 指定的线程里启动 subscribe()。
效果:
- 切换起源 Observable 的线程;
- 当多次调用 subscribeOn() 的时候,只有最上面的会对起源 Observable 起作用,即第一次写的 subscribeOn(),结构图如下,不同颜色箭头代表不同的线程:
observeOn()
原理:在内部创建的 Observer 的 onNext()、onError()、onSuccess() 等回调方法里,通过 Scheduler 指定的线程来调用下级 Observer 的对应回调方法。
效果:
- 切换 observeOn() 下面的 Observer 的回调所在的线程
- 当多次调用 observeOn() 的时候,每个都会进行一次线程切换,影响范围是它下面的每个 Observer (除非又遇到新的 observeOn()),结构图如下,不同颜色箭头代表不同的线程:
Scheduler 的原理
Schedulers.newThread() 和 Schedulers.io():
- 当 scheduleDirect() 被调用的时候,会创建一个 Worker, Worker 的内部会有一个 Executor,由 Executor 来完成实际的线程切换;
- scheduleDirect() 还会创建出一个 Disposable 对象,交给外层的 Observer,让它能执行 dispose() 操作,取消订阅链;
- newThread() 和 io() 的区别在于,io() 可能会对 Executor 进行重用。
AndroidSchedulers.mainThread():
- 通过内部的 Handler 把任务发送到主线程去做。
相关代码详见 MainActivity。
参考资料:
腾讯课堂 HenCoder