相关文章推荐
失落的皮蛋  ·  git ...·  2 周前    · 
私奔的硬盘  ·  记录.gitattributes ...·  2 周前    · 
淡定的白开水  ·  javascript ...·  2 月前    · 
温暖的火腿肠  ·  node.js - ...·  1 年前    · 
紧张的热带鱼  ·  png convert to svg ...·  1 年前    · 


关键词:合并 Observable

RxJava系列教程:

​​1. RxJava使用介绍​​ ​ ​【视频教程】​ ​​
​2. RxJava操作符
​​• Creating Observables(Observable的创建操作符)​​ ​ ​【视频教程】​ ​​
​​• Transforming Observables(Observable的转换操作符)​​ ​ ​【视频教程】​ ​​
​​• Filtering Observables(Observable的过滤操作符)​​ ​ ​【视频教程】​ ​​
​​• Combining Observables(Observable的组合操作符)​​ ​ ​【视频教程】​ ​​
​​• Error Handling Operators(Observable的错误处理操作符)​​ ​ ​【视频教程】​ ​​
​​• Observable Utility Operators(Observable的辅助性操作符)​​ ​ ​【视频教程】​ ​​
​​• Conditional and Boolean Operators(Observable的条件和布尔操作符)​​ ​ ​【视频教程】​ ​​
​​• Mathematical and Aggregate Operators(Observable数学运算及聚合操作符)​​ ​ ​【视频教程】​ ​​
​​• 其他如observable.toList()、observable.connect()、observable.publish()等等;​​ ​ ​【视频教程】​ ​​
​​3. RxJava Observer与Subcriber的关系​​ ​ ​【视频教程】​ ​​
​​4. RxJava线程控制(Scheduler)​​ ​ ​【视频教程】​ ​​
​​5. RxJava 并发之数据流发射太快如何办(背压(Backpressure))​​ ​ ​【视频教程】​


前言

在RxJava中, 同时处理多个Observables是很常见的一种操作。下面我们简单分析下几个组合类的操作符。

如果你不想看操作符的介绍,可以直接跳到项目实战部分。

Merge

在异步的世界经常会创建这样的场景,我们有多个来源但是只想有一个结果:多输入,单输出。RxJava的merge()方法将帮助你把两个甚至更多的Observables合并到他们发射的数据里。下图给出了把两个序列合并在一个最终发射的Observable。

正如你看到的那样,发射的数据被交叉合并到一个Observable里面。注意如果你同步的合并Observable,它们将连接在一起并且不会交叉。

示例代码

RxJava 合并组合两个(或多个)Observable数据源_RxJava

我们创建了Observable和observableApps数据以及新的observableReversedApps逆序列表。使用Observable.merge(),我们可以创建新的Observable——MergedObservable在单个可观测序列中发射源Observables发出的所有数据。

运行结果如下:

注意:
merge操作符合并后的Observable数据类型没有改变。

ZIP

我们在处理多源时可能会带来这样一种场景:多从个Observables接收数据,处理它们,然后将它们合并成一个新的可观测序列来使用。RxJava有一个特殊的方法可以完成:zip()合并两个或者多个Observables发射出的数据项,根据指定的函数Func*变换它们,并发射一个新值。下图展示了zip()方法如何处理发射的“numbers”和“letters”然后将它们合并一个新的数据项:

RxJava 合并组合两个(或多个)Observable数据源_RxJava_02

官网介绍更详细:​ ​http://reactivex.io/documentation/operators/zip.html​

如图可以看到,zip操作符并不是像merge操作符那样只合并了数据,重要的是发生了质的变化。

对于“真实世界”的例子来说,我们将使用已安装的应用列表和一个新的动态的Observable来让例子变得有点有趣味。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictoc Observable变量使用interval()函数每秒生成一个Long类型的数据:简单且高效,正如之前所说的,我们需要一个Func对象。因为它需要传两个参数,所以是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
appInfo.setName(time + " " + appInfo.getName());
return appInfo;
}

前面的loadList()函数变成这样:

RxJava 合并组合两个(或多个)Observable数据源_合并_03

正如你看到的那样,zip()函数有三个参数:两个Observables和一个Func2。

结果如下:

注意:
zip操作符合并后的Observable数据类型可以发生改变。

Join

前面两个方法,zip()和merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJava的join()函数基于时间窗口将两个Observables发射的数据结合在一起。

RxJava 合并组合两个(或多个)Observable数据源_数据_04

为了正确的理解上一张图,我们解释下join()需要的参数:

  • 第二个Observable和源Observable结合。
  • Func1参数:在指定的由时间窗口定义时间间隔内,源Observable发射的数据和从第二个Observable发射的数据相互配合返回的Observable。
  • Func1参数:在指定的由时间窗口定义时间间隔内,第二个Observable发射的数据和从源Observable发射的数据相互配合返回的Observable。
  • Func2参数:定义已发射的数据如何与新发射的数据项相结合。

如下练习的例子,我们可以修改loadList()函数像下面这样:

RxJava 合并组合两个(或多个)Observable数据源_Observable_05

我们有一个新的对象appsSequence,它是一个每秒从我们已安装的app列表发射app数据的可观测序列。tictoc这个Observable数据每秒只发射一个新的Long型整数。为了合并它们,我们需要指定两个Func1变量:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描述了两个时间窗口。下面一行描述我们如何使用Func2将两个发射的数据结合在一起。

this::updateTitle

结果如下:

它看起来有点乱,但是注意app的名字和我们指定的时间窗口,我们可以看到:一旦第二个数据发射了我们就会将它与源数据结合,但我们用同一个源数据有2秒钟。这就是为什么标题重复数字累加的原因。

值得一提的是,为了简单起见,也有一个join()操作符作用于字符串然后简单的和发射的字符串连接成最终的字符串。

combineLatest

RxJava的combineLatest()函数有点像zip()函数的特殊形式。正如我们已经学习的,zip()作用于最近未打包的两个Observables。相反,combineLatest()作用于最近发射的数据项:如果Observable1发射了A并且Observable2发射了B和C,combineLatest()将会分组处理AB和AC,如下图所示:

RxJava 合并组合两个(或多个)Observable数据源_RxJava_06

combineLatest()函数接受二到九个Observable作为参数,如果有需要的话或者单个Observables列表作为参数。

从之前的例子中把loadList()函数借用过来,我们可以修改一下来用于combineLatest()实现loadList():

RxJava 合并组合两个(或多个)Observable数据源_合并_07

这我们使用了两个Observables:一个是每秒钟从我们已安装的应用列表发射一个App数据,第二个是每隔1.5秒发射一个Long型整数。我们将他们结合起来并执行updateTitle()函数,结果如下:

RxJava 合并组合两个(或多个)Observable数据源_数据_08

正如你看到的,由于不同的时间间隔,AppInfo对象如我们所预料的那样有时候会重复。

And,Then和When

在将来还有一些zip()满足不了的场景。如复杂的架构,或者是仅仅为了个人爱好,你可以使用And/Then/When解决方案。它们在RxJava的joins包下,使用Pattern和Plan作为中介,将发射的数据集合并到一起。

RxJava 合并组合两个(或多个)Observable数据源_数据_09

我们的loadList()函数将会被修改从这样:

RxJava 合并组合两个(或多个)Observable数据源_合并_10

和通常一样,我们有两个发射的序列,observableApp,发射我们安装的应用列表数据,tictoc每秒发射一个Long型整数。现在我们用and()连接源Observable和第二个Observable。

JoinObservable.from(observableApp).and(tictoc);

这里创建一个pattern对象,使用这个对象我们可以创建一个Plan对象:”我们有两个发射数据的Observables,then()是做什么的?”

pattern.then(this::updateTitle);

现在我们有了一个Plan对象并且当plan发生时我们可以决定接下来发生的事情。

.when(plan).toObservable()

这时候,我们可以订阅新的Observable,正如我们总是做的那样。

Switch

有这样一个复杂的场景就是在一个subscribe-unsubscribe的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。

RxJava的switch(),正如定义的,将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

给出一个发射多个Observables序列的源Observable,switch()订阅到源Observable然后开始发射由第一个发射的Observable发射的一样的数据。当源Observable发射一个新的Observable时,switch()立即取消订阅前一个发射数据的Observable(因此打断了从它那里发射的数据流)然后订阅一个新的Observable,并开始发射它的数据。

StartWith

我们已经学到如何连接多个Observables并追加指定的值到一个发射序列里。RxJava的startWith()是concat()的对应部分。正如concat()向发射数据的Observable追加数据那样,在Observable开始发射他们的数据之前, startWith()通过传递一个参数来先发射一个数据序列。

RxJava 合并组合两个(或多个)Observable数据源_操作符_11

小结

通过上面分析,我们学习了如何将两个或者更多个Observable结合来创建一个新的可观测序列。我们将能够merge Observable,join Observables ,zip Observables 并在几种情况下把他们结合在一起。

说了那么多,还是来个实际场景吧。

项目实战

需求

一次下载多张图片

思路

1.使用from操作符创建图片url列表数据流
2.获取单个url,并下载

代码如下:

RxJava 合并组合两个(或多个)Observable数据源_操作符_12

注意:本代码并没有实现保存图片的功能

这样确实实现了下载图片的功能,但是在onNext方法里面如果我们想根据url地址保存图片的名字,这就出现问题了,onNext方法里我们只能获得ResponseBody,怎么解决呢?

解决问题的关键还是要在flatmap操作符中的Func1方法中,只要把url信息在这里返回即可,也就是将ResponseBody和url撮合在一起。

通过查找api,终于发现了适合我们的合并Observable的zip操作符。为什么选择zip操作符而不用merge操作符呢?文章开头已经说明了。

最终代码如下:

RxJava 合并组合两个(或多个)Observable数据源_合并_13

图片如果看不清楚,鼠标右键在新标签页查看即可。

这个是我的实现方式,如果你有更好的,请留言指出!

That’s all, thank you !

2016.09.0

Node做java中间件 java后端中间件

笔记记录了分布式架构中广泛使用的微服务中间件的使用文档一 架构认识微服务技术对比 spring官方地址:Spring | Homeboot与cloud兼容性:二 中间件使用eureka 一 搭建EurekaServe1 创建springboot工程,引入依赖<!-- eureka服务端依赖--> <dependency>

java 设置线程并发度 java线程并发工具类
在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。 1等待多线程完成的CountDownLatch CountDownLatch:CountDownLatch允许一个或多个线程等待其他线程