RxJava操作符(九)Connectable Observable Operators

一、Publish

首先我们有必要来了解一下什么是Connectable Observable: 就是一种特殊的Observable对象,并不是Subscrib的时候就发射数据,而是只有对其应用connect操作符的时候才开始发射数据,所以可以用来更灵活的控制数据发射的时机。而Publish操作符就是用来将一个普通的Observable对象转化为一个Connectable Observable。需要注意的是如果发射数据已经开始了再进行订阅只能接收以后发射的数据。

二、 Connect

就如上文所述,Connect操作符就是用来触发Connectable Observable发射数据的。应用Connect操作符后会返回一个Subscription对象,通过这个Subscription对象,我们可以调用其unsubscribe方法来终止数据的发射。另外,如果还没有订阅者订阅的时候就应用Connect操作符也是可以使其开始发射数据的。

下面我们使用publish操作符创建一个Connectable Observable:

1
2
3
4
5
private ConnectableObservable<Long> publishObserver() {
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS);
obser.observeOn(Schedulers.newThread());
return obser.publish();
}

然后创建两个Action1对象,在不同的时机对其进行订阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ConnectableObservable<Long> obs = publishObserver();
Action1 action2 = o -> log("action2:" + o);
Action1 action1 = o -> {
log("action1:" + o);
if ((long) o == 3) obs.subscribe(action2);
};
obs.subscribe(action1);

mLButton.setText("start");
mLButton.setOnClickListener(e -> mSubscription = obs.connect());
mRButton.setText("stop");
mRButton.setOnClickListener(e -> {
if (mSubscription != null) {
mSubscription.unsubscribe();
}
});

运行结果如下,当我们点击start按钮的时候对这个Connectable Observable 对象应用connect操作符,让其开始发射数据。当发射到3的时候将action2给订阅上,这个两个订阅者将同时收到相同的数据。点击stop按钮的时候终止其数据的发射。

三、RefCount

RefCount操作符就是将一个Connectable Observable 对象再重新转化为一个普通的Observable对象,这时候如果由订阅者进行订阅将会触发数据的发射。

下面我们首先如上文一样使用publish创建一个Connectable Observable 对象,然后再使用RefCount将其转化为一个普通的Observable对象。

1
2
3
4
5
6
7
ConnectableObservable<Long> obs = publishObserver();
mLButton.setText("refCount");
mLButton.setOnClickListener(e -> subscription = obs.refCount().subscribe(aLong -> {
log("refCount:" + aLong);
}));
mRButton.setText("stop");
mRButton.setOnClickListener(e -> subscription.unsubscribe());

运行结果如下所示,当我们进行订阅后就会触发其发射数据,点击stop按钮终止数据的发射。

四、Replay

Replay操作符返回一个Connectable Observable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用Replay操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。对缓存的控制可以从空间和时间两个方面来实现。

下面我们使用Relay来创建两个Connectable Observable 对象并且分别从空间和时间上来控制其缓存的大小,前一个我们控制缓存的大小为2,后一个我们控制缓存的时间为3秒:

1
2
3
4
5
6
7
8
9
10
11
private ConnectableObservable<Long> relayCountObserver() {
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS);
obser.observeOn(Schedulers.newThread());
return obser.replay(2);
}

private ConnectableObservable<Long> relayTimeObserver() {
Observable<Long> obser = Observable.interval(1, TimeUnit.SECONDS);
obser.observeOn(Schedulers.newThread());
return obser.replay(3, TimeUnit.SECONDS);
}

创建两个Action1对象,并分别进行订阅, 其中第二个Action1对象会在发射5个数据后才进行订阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Action1 action2 = o -> log("action2:" + o);
Action1 action1 = o -> {
log("action1:" + o);
if ((long) o == 5) obs.subscribe(action2);
};

mLButton.setText("relayCount");
mLButton.setOnClickListener(e -> {
obs = relayCountObserver();
obs.subscribe(action1);
log("relayCount");
mSubscription = obs.connect();
});
mRButton.setText("relayTime");
mRButton.setOnClickListener(e -> {
obs = relayTimeObserver();
obs.subscribe(action1);
log("relayTime");
mSubscription = obs.connect();
});

运行结果如下,我们可以看到第一个是限定缓存数目为2,所以action2在订阅后会立刻接收到4和5这两个数据;
而第二个是限定缓存时间为3秒,所以所以action2在订阅后也会立刻接收到3、4、5这三个数据(3有可能会过期)。

关于Connectable Observable的操作符就到这里了,本文的demo程序见github