RxJava操作符(二)Transforming Observables

上一篇文章中,我们了解了如何创建Observable,仅仅创建一个Observable可能无法满足一些复杂的场景,所以我们很可能需要将创建的Observable安装某种规则转化一下来发射数据。在这篇文章里我们来了解一下如何来转化Observable

一、Buffer

顾名思义,Buffer操作符所要做的事情就是将数据安装规定的大小做一下缓存,然后将缓存的数据作为一个集合发射出去。如下图所示,第一张示例图中我们指定buffer的大小为3,收集到3个数据后就发射出去。

第二张图中我们加入了一个skip参数用来指定每次发射一个集合需要跳过几个数据,图中如何指定count为2,skip为3,就会每3个数据发射一个包含两个数据的集合,如果count==skip的话,我们就会发现其等效于第一种情况了。

buffer不仅仅可以通过数量规则来缓存,还可以通过时间等规则来缓存,如规定3秒钟缓存发射一次等,见下面代码,我们创建了两个Observable,并使用buffer对其进行转化,第一个通过数量来缓存,第二个通过时间来缓存。

1
2
3
4
5
6
7
8
private Observable<List<Integer>> bufferObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer(2, 3);
}

private Observable<List<Long>> bufferTimeObserver() {
return Observable.interval(1, TimeUnit.SECONDS).buffer(3,
TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread());
}

对其进行订阅:

1
2
3
4
mLButton.setText("buffer");
mLButton.setOnClickListener(e -> bufferObserver().subscribe(i -> log("buffer:" + i)));
mRButton.setText("bufferTime");
mRButton.setOnClickListener(e -> bufferTimeObserver().subscribe(i -> log("bufferTime:" + i)));

运行结果如下,可以看到第一个Observable会每隔3个数字发射出前两个数字;第二个Observable会每隔3秒钟输出2~4个数字。

二、FlatMap

FlatMap是一个用处多多的操作符,可以将要数据根据你想要的规则进行转化后再发射出去。其原理就是将这个Observable转化为多个以原Observable发射的数据作为源数据的Observable,然后再将这多个Observable发射的数据整合发射出来,需要注意的是最后的顺序可能会交错地发射出来,如果对顺序有严格的要求的话可以使用concatmap操作符。FlatMapIterable和FlatMap基本相同,不同之处为其转化的多个Observable是使用Iterable作为源数据的。

下面我们分别使用FlatMap和FlatMapIterable创建并转化两个Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Observable<String> flatMapObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(integer -> Observable.just("flat map:" + integer));
}

private Observable<? extends Integer> flatMapIterableObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMapIterable(
integer -> {
ArrayList<Integer> s = new ArrayList<>();
for (int i = 0; i < integer; i++) {
s.add(integer);
}
return s;
}
);
}

分别对其进行订阅:

1
2
3
4
mLButton.setText("flatMap");
mLButton.setOnClickListener(e -> flatMapObserver().subscribe(i -> log(i)));
mRButton.setText("flatMapIterable");
mRButton.setOnClickListener(e -> flatMapIterableObserver().subscribe(i -> log("flatMapIterable:" + i)));

运行后的结果如下所示,第一个操作符将发射的数据都加上了个flat map的字符串前缀,第二个将数据做了扩展,会输出n个n数字。

三、GroupBy

GroupBy操作符将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发射其所包含的的数据,类似于sql里面的groupBy。
在使用中,我们需要提供一个生成key的规则,所有key相同的数据会包含在同一个小的Observable种。另外我们还可以提供一个函数来对这些数据进行转化,有点类似于集成了flatMap。

下面创建两个经过groupBy转化的Observable对象,第一个按照奇数偶数分组,第二个分组后将数字加上一个字符串前缀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
mLButton.setText("groupBy");
mLButton.setOnClickListener(
e -> groupByObserver().subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(GroupedObservable<Integer, Integer> groupedObservable) {
groupedObservable.count()
.subscribe(integer ->
log("key" + groupedObservable.getKey() + " contains:" + integer + " numbers"));
}
}));
mRButton.setText("groupByKeyValue");
mRButton.setOnClickListener(e -> groupByKeyValueObserver()
.subscribe(new Subscriber<GroupedObservable<Integer, String>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(GroupedObservable<Integer, String> integerIntegerGroupedObservable) {
if (integerIntegerGroupedObservable.getKey() == 0) {
integerIntegerGroupedObservable.subscribe(integer -> log(integer));
}
}
}));
}

private Observable<GroupedObservable<Integer, Integer>> groupByObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).groupBy(integer -> integer % 2);
}

private Observable<GroupedObservable<Integer, String>> groupByKeyValueObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.groupBy(integer -> integer % 2, integer -> "groupByKeyValue:" + integer);
}

运行结果如下,我们拿到想要的结果。

四、Map、Cast

Map操作符的功能类似于FlatMap,不同之处在于它对数据的转化是直接进行的,而FlatMap需要通过一些中间的Observables来进行。

Cast将Observable发射的数据强制转化为另外一种类型,属于Map的一种具体的实现

下面我们创建两个经过map和cast转化的Observable对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Observable<Integer> mapObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).map(integer -> integer * 10);
}

private Observable<Dog> castObserver() {
return Observable.just(getAnimal())
.cast(Dog.class);
}

Animal getAnimal() {
return new Dog();
}

class Animal {
protected String name = "Animal";

Animal() {
log("create " + name);
}

String getName() {
return name;
}
}

class Dog extends Animal {
Dog() {
name = getClass().getSimpleName();
log("create " + name);
}

}

对其进行注册:

1
2
3
4
mLButton.setText("Map");
mLButton.setOnClickListener(e -> mapObserver().subscribe(i -> log("Map:" + i)));
mRButton.setText("Cast");
mRButton.setOnClickListener(e -> castObserver().subscribe(i -> log("Cast:" + i.getName())));

运行后得到结果如下。可以看到,map操作符将数据都乘以10后再发射出来,cast操作符将Animal类型的对象强制转化为Dog类型的对象。另外我们还可以验证一下一个知识点,有继承的情况下创建对象会首先调用父类的构造方法哦。

五、Scan

Scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用,有点类似于递归操作

下面我们通过一个存放10个2的list创建一个Observable对象并使用scan对其进行转化,转化的函数就是计算的结果乘以下一个数。

1
2
3
private Observable<Integer> scanObserver() {
return Observable.from(list).scan((x, y) -> x * y).observeOn(AndroidSchedulers.mainThread());
}

对其进行订阅:

1
2
mLButton.setText("scan");
mLButton.setOnClickListener(e -> scanObserver().subscribe(i -> log("scan:" + i)));

得到结果如下,可以看到,我们输出了2的n次方。

六、Window

Window操作符类似于我们前面讲过的buffer,不同之处在于window发射的是一些小的Observable对象,由这些小的Observable对象来发射内部包含的数据。同buffer一样,window不仅可以通过数目来分组还可以通过时间等规则来分组

下面我们创建两个Observable对象分别使用window的数目和时间规则来进行分组。

1
2
3
4
5
6
7
8
9
private Observable<Observable<Integer>> windowCountObserver() {
return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).window(3);
}

private Observable<Observable<Long>> wondowTimeObserver() {
return Observable.interval(1000, TimeUnit.MILLISECONDS)
.window(3000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread());
}

分别对其订阅:

1
2
3
4
5
6
7
8
9
10
mLButton.setText("window");
mLButton.setOnClickListener(e -> windowCountObserver().subscribe(i -> {
log(i);
i.subscribe((j -> log("window:" + j)));
}));
mRButton.setText("Time");
mRButton.setOnClickListener(e -> wondowTimeObserver().subscribe(i -> {
log(i);
i.observeOn(AndroidSchedulers.mainThread()).subscribe((j -> log("wondowTime:" + j)));
}));

运行结果如下,可以看到第一个Observable对象没次发射出一个包含3个数据的小Observable,第二个Observable对象每隔3秒钟发射出一个包含2~4个数据的Observable对象

Transforming操作符是Rxjava强大之处的重要体现,要灵活使用Rxjava掌握Transforming操作符是必不可少的。

本文的demo程序见github