RxJava操作符(十)自定义操作符

Rxjava允许我们来自定义操作符来满足我们特殊的需求。如果我们的自定义操作符想要作用到Observable发射出来的数据上,我们就要使用lift操作符;如果我们的自定义操作符想要改变整个的Observable,就需要使用compose操作符了。

一、 lift

当我们自定义了一个操作符后,使用lift可以将我们自定义的操作符和其它的操作符一起做链式调用,就好像Rxjava原生的操作符一样。下面我们自定义一个操作符,并使用lift添加到Observable的链式调用里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Observable<String> liftObserver() {
Operator<String, String> myOperator = new Operator<String, String>() {
@Override
public Subscriber<? super String> call(Subscriber<? super String> subscriber) {
return new Subscriber<String>(subscriber) {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}

@Override
public void onNext(String s) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("myOperator:" + s);
}
}
};
}

};
return Observable.just(1, 2, 3).map(integer -> "map1:" + integer).lift(myOperator).map(s -> "map2:" + s);
}

我们使用lift将我们自定义的操作符夹在两个map操作符之间,形成链式的调用。
对其进行订阅:

1
2
mLButton.setText("lift");
mLButton.setOnClickListener(e -> liftObserver().subscribe(s -> log("lift:" + s)));

运行结果如下,可以看到输出的结果也正如预期的一样,我们自定义的操作符作用在两个map操作符之间。

二、compose

Compose操作符是将源Observable按照自定义的方式转化成另外一个新的Observable。可以这么说compose是对Observable进行操作的而lift是对Subscriber进行操作的,作用点是不同的。

下面我们自定义一个Transformer对象,并使用compse操作符将其应用到Observable上:

1
2
3
4
5
6
7
8
9
10
11
private Observable<String> composeObserver() {
Transformer<Integer, String> myTransformer = new Transformer<Integer, String>() {
@Override
public Observable<String> call(Observable<Integer> integerObservable) {
return integerObservable
.map(integer -> "myTransforer:" + integer)
.doOnNext(s -> log("doOnNext:" + s));
}
};
return Observable.just(1, 2, 3).compose(myTransformer);
}

在这个我们自定义的Transformer对象中,我们将原先发射Integer的Observable转化成了一个发射String内容的Observable;并且我们还添加了doOnNext操作符来打印出发射的数据,方便我们进行后续的调试。个人感觉Transformer更像是一个批量转化器,如你有很多Observable对象在使用,可以定义一个通用的Transformer对象,里面可以通过doOnNext打印数据,可以定义subscribeOn和observeOn的线程等等,最后使用compose操作符将其应用到我们所有的Observable对象上就可以统一进行设定了。
下面我们进行一下订阅吧:

1
2
mRButton.setText("compose");
mRButton.setOnClickListener(e -> composeObserver().subscribe(s -> log("compose:" + s)));

运行的结果如下,我们可以看到对每个数据都将其转化为String之后通过doOnNext输出了数据的值,然后输出了Subscriber接收到的数据。

关于自定义操作符还有一些需要注意的地方:

  • 自定义Operator在发射任何数据之前都要使用!subscriber.isUnsubscribed()来检查Subscriber的状态,如果没有任何Subscriber订阅就没有必要去发射数据了
  • 自定义Operator要遵循Observable的核心原则:

    • 可以多次调用Subscriber的onNext方法,但是同一个数据只能调用一次。
    • 可以调用Subscriber的onComplete或者onError方法,但是这两个是互斥的,调用了一个就不能再调用另外一个了,并且一旦调用了任何一个方法就不能再调用onNext方法了。
    • 如果无法保证遵守以上两条,可以对自定义操作符加上serialize操作符,这个操作符会强制发射正确的数据。
  • 自定义Operator内部不能阻塞住。

  • 如果通过compose组合多个操作符就能达到目的就不要自己去写新的代码来实现,在Rxjava的源码中就有很多这样的例子,如:
    • first()操作符是通take(1).single()来实现的。
    • ignoreElements()是通过 filter(alwaysFalse())来实现的。
    • reduce(a) 是通过 scan(a).last()来实现的。
  • 当有异常的时候,不能继续发射正常的数据,要立刻调用Subscriber的onError方法将异常抛出去。
  • 注意发射数据为null的情况,这和完全不发射数据不是一回事。

关于自定义操作符就到这里了,本文的demo程序见github