Rxjava中封装回调需要注意的线程问题

Rxjava 很大的一个特点就是可以通过 observeOn 和 subscribeOn 操作符结合Scheduler来方便地切换线程,这也是很多人选择使用Rxjava的主要原因之一。但是如果Observable中封装了回调方法,那我们就需要注意了,操作符可能并不会如我们预期的那样运行在对应的线程上。

简单的Scheduler切换线程

让我们首先来看看下面的代码:创建了一个自定义的Observable发送出一个数字1然后通过map操作符转化成一个字符串,之后将其订阅在Computation线程上并在IO线程上接收最终结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.create { emitter ->
println("Current Thread in Observable: ${Thread.currentThread().name}")
emitter.onNext(1)
emitter.onComplete()
}.map {
println("Current Thread in map: ${Thread.currentThread().name}")
"Number: $1"
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe {
println("Current Thread in subscriber: ${Thread.currentThread().name}")
println(it)
countDownLatch.countDown()
}

想必读者也可以很容易得出跟运行结果一致的结论,自定义的Observer和map都试运行在相同的Computation线程上,而Subscriber则是运行在IO线程上。

Current Thread in Observable: RxComputationThreadPool-2
Current Thread in map: RxComputationThreadPool-2
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1

加入回调

首先创建一个Store对象,模拟需要注册listener接收回调的对象。在这里创建了一个自定义的Scheduler: main,用来模拟主线程。

Store对象在初始化1秒之后会在主线程上通知所有的listener初始化完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Store {
private val listeners = mutableListOf<InitListener>()
private val main = Schedulers.from(Executors.newSingleThreadExecutor { Thread(it, "main") })

init {
Observable.just(true).delay(1, TimeUnit.SECONDS)
.observeOn(main)
.subscribe {
listeners.forEach { listener -> listener.onInit(it) }
}
}

fun registerListener(listener: InitListener) {
listeners.add(listener)
}

fun removeListener(listener: InitListener) {
listeners.remove(listener)
}
}

下面在Observer里加上一个回调的逻辑,在回调里发送数据,这也是一种很简单的将回调封装成Observable的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val store = Store()

Observable.create { emitter ->
println("Current Thread in Observable: ${Thread.currentThread().name}")
store.registerListener(object : InitListener {
override fun onInit(succeed: Boolean) {
println("Current Thread in onInit: ${Thread.currentThread().name}")
emitter.onNext(1)
emitter.onComplete()
}
})
}.map {
println("Current Thread in map: ${Thread.currentThread().name}")
"Number: $1"
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe {
println("Current Thread in subscriber: ${Thread.currentThread().name}")
println(it)
countDownLatch.countDown()
}

请读者思考一下这里面加log的地方都会运行在什么线程上呢?

思考完毕后来看一下实际的运行结果,是否跟预期的不一样呢?

Current Thread in Observable: RxComputationThreadPool-2
Current Thread in onInit: main
Current Thread in map: main
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1

可以看到不仅onInit回调是运行在main线程上,map也是运行main线程上!这到底是怎么回事呢?

这是因为onInit回调方法由Store对象在main线程上调用的,所以其必然是运行在main线程上。然后在回调里直接调用onNext发送数据当然也是在main线程上,自然后续的map操作也是在main线程上。所以如果以为在Computation线程上注册的Observer就可以在回调方法或者map里做一些比较耗时的操作就错了,这会导致堵塞主线程并造成ANR的后果。

那如果必须要在map操作符里做一些耗时的操作该怎么办呢?这时候只需要将observeOn挪到map之上就可以了。这是因为observeOn会对下游造成影响,在这里使用observeOn切换线程之后,下面的操作符都将运行在这个线程上。

.observeOn(Schedulers.io())
.map {
     println("Current Thread in map: ${Thread.currentThread().name}")
     "Number: $1"
 }

再次运行程序就会得到如下的运行结果,这跟我们之前想要的效果就一致了。

Current Thread in Observable: RxComputationThreadPool-2
Current Thread in onInit: main
Current Thread in map: RxCachedThreadScheduler-1
Current Thread in subscriber: RxCachedThreadScheduler-1
Number: $1

通过上述的实验,我们可以得出如下结论:

  1. 回调方法的运行线程是由调用方决定的。
  2. 将回调封装成Observable并在回调方法里发送数据是运行在回调方线程上的,并会对后续的操作符造成影响。
  3. 想要改变后续操作符的线程应该尽早使用observeOn来进行线程的切换。