Rxjava 很大的一个特点就是可以通过 observeOn 和 subscribeOn 操作符结合Scheduler来方便地切换线程,这也是很多人选择使用Rxjava的主要原因之一。但是如果Observable中封装了回调方法,那我们就需要注意了,操作符可能并不会如我们预期的那样运行在对应的线程上。
简单的Scheduler切换线程
让我们首先来看看下面的代码:创建了一个自定义的Observable发送出一个数字1然后通过map操作符转化成一个字符串,之后将其订阅在Computation线程上并在IO线程上接收最终结果。
1 | Observable.create { emitter -> |
想必读者也可以很容易得出跟运行结果一致的结论,自定义的Observer和map都试运行在相同的Computation线程上,而Subscriber则是运行在IO线程上。
Current Thread in Observable: RxComputationThreadPool-2
Current Thread in map: RxComputationThreadPool-2
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1
加入回调
首先创建一个Store对象,模拟需要注册listener接收回调的对象。在这里创建了一个自定义的Scheduler: main,用来模拟主线程。
Store对象在初始化1秒之后会在主线程上通知所有的listener初始化完成。
1 | class Store { |
下面在Observer里加上一个回调的逻辑,在回调里发送数据,这也是一种很简单的将回调封装成Observable的方法。
1 | val store = Store() |
请读者思考一下这里面加log的地方都会运行在什么线程上呢?
思考完毕后来看一下实际的运行结果,是否跟预期的不一样呢?
Current Thread in Observable: RxComputationThreadPool-2
Current Thread in onInit: main
Current Thread in map: main
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1
可以看到不仅onInit回调是运行在main线程上,map也是运行main线程上!这到底是怎么回事呢?
这是因为onInit回调方法由Store对象在main线程上调用的,所以其必然是运行在main线程上。然后在回调里直接调用onNext发送数据当然也是在main线程上,自然后续的map操作也是在main线程上。所以如果以为在Computation线程上注册的Observer就可以在回调方法或者map里做一些比较耗时的操作就错了,这会导致堵塞主线程并造成ANR的后果。
那如果必须要在map操作符里做一些耗时的操作该怎么办呢?这时候只需要将observeOn挪到map之上就可以了。这是因为observeOn会对下游造成影响,在这里使用observeOn切换线程之后,下面的操作符都将运行在这个线程上。
.observeOn(Schedulers.io())
.map {
println("Current Thread in map: ${Thread.currentThread().name}")
"Number: $1"
}
再次运行程序就会得到如下的运行结果,这跟我们之前想要的效果就一致了。
Current Thread in Observable: RxComputationThreadPool-2
Current Thread in onInit: main
Current Thread in map: RxCachedThreadScheduler-1
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1
通过上述的实验,我们可以得出如下结论:
- 回调方法的运行线程是由调用方决定的。
- 将回调封装成Observable并在回调方法里发送数据是运行在回调方线程上的,并会对后续的操作符造成影响。
- 想要改变后续操作符的线程应该尽早使用observeOn来进行线程的切换。