原文:Design Patterns and Best Practices in Java
协议:CC BY-NC-SA 4.0
贡献者:飞龙
本文来自【ApacheCN Java 译后编辑(MTPE)流程来尽可能提升效率。
本章将描述反应编程范式,以及为什么它可以很好地适用于具有函数元素的语言。读者将熟悉反应编程背后的概念。我们将介绍在创建反应应用程序时从观察者模式和迭代器模式中使用的元素。这些例子将使用反应框架和名称 (版本 2.0)的 Java 实现。
我们将讨论以下主题:
- 什么是反应编程?
- RxJava 简介
- 安装 RxJava
- 观察对象、流动对象、观察者和订阅
- 创建可观察对象
- 变换可观察对象
- 过滤可观察对象
- 组合可观察对象
- 错误处理
- 调度者
- 主题
- 示例项目
什么是反应编程?
根据《反应宣言》,无功系统具有以下属性:
- :系统以一致、可预测的方式及时响应。
- :系统对故障有弹性,能迅速恢复。
- :通过增加或减少分配的资源,系统在不同的工作负载下保持响应能力。这是通过动态搜索和修复瓶颈来实现的。这不能与可伸缩性混淆。弹性系统需要根据需要上下伸缩-见此页面。
- :依靠异步信息传输,确保松耦合、隔离、位置透明和容错。
需求是真实的。如今,无响应系统被认为是有缺陷的,用户将避免使用。根据这个页面,搜索引擎中一个没有回应的网站排名很低:
响应式设计是谷歌的推荐设计模式
反应系统是一种利用元素构成复杂系统的架构风格,有些元素是通过反应编程技术构建的。
反应编程是一个依赖于异步数据流的例子。它是异步编程的事件驱动子集。相反,反应系统由新闻驱动,这意味着接收器提前知道,接收器可以是任何观察者。
反应式编程不仅仅是基于事件的编程,因为它利用了数据流,它强调数据流而不是控制流。过去,鼠标或键盘事件或服务器上的新连接等后端事件都是在线事件循环(thread of execution)中处理的。现在一切都可以用来创建数据流;假设来自后端点 JSON REST 可以等待、过滤或与来自不同端点的其他响应合并的数据流。该方法通过消除开发人员的显式创建多核和多核 CPU 所有样板代码在环境中处理异步调用的需求提供了极大的灵活性。
电子表格示例是最好、最过度使用的反应编程示例之一。定义流(flow)类似于声明 Excel 的 C1 单元格的值等于 B1 单元格和 A1 单元格的内容。每当 A1 或 B1 单元更新时,会观察到变化并做出反应,其副作用是 C1 值得更新。现在假设 C2 到 Cn 单元格等于 A2 到 An 加上 B2 到 Bn 同样的规则适用于所有单元格。
反应编程采用以下编程抽象,有些来自函数编程世界:
- :这些为在不久的将来某个地方提供的价值采取行动提供了一种手段。
- :它为列车运行提供了数据管道,就像列车轨道一样。
- :这些是应用于流函数的输入变量函数的结果,就像电子表单元格一样,通过应用加号数学函数来设置两个给定的输入参数。
- :该机制用于实时处理环境,包括()等待硬件,通过丢弃元件调整输入处理速度,以赶上输入速度;作为背压策略。
- :这类似于好莱坞的原则,因为它逆转了呼叫的方向。一旦数据可用,请调用流中的相关观察者来处理数据;相反,拉动机制同步获取信息。
有许多 Java 例如,库和框架允许程序员编写反应代码 Reactor、Ratpack、RxJava、Spring Framework 5 和 Vert.x。通过添加 JDK9 Flow API,无需安装其他反应编程,开发人员可以使用反应编程 API。
RxJava 简介
RxJava 是从 Microsoft.NET 实现世界移植的反应扩展(一个库,用于使用可观察序列编写异步和基于事件的程序)。2012 年,Netflix 意识到他们需要一个范式的改变,因为他们的架构无法应对庞大的客户群,他们决定通过引入无功扩展的力量 JVM 实现世界无功扩张;RxJava 就是这样诞生的。除了 RxJava 还有别的 JVM 实现,比如 RxAndroid、RxJavaFX、RxKotlin 和 RxScale。这种方法给了他们想要的动力,它也为我们提供了使用它的机会。
RxJavaJar 是根据 Apache 软件许可证 2.0 获得许可的版本可以在中央 Maven 在仓库中获得。
使用多个外部库 RxJava:
hystrix
:隔离远程系统访问点的延迟和容错库rxjava-http-tail
:一个 HTTP 日志跟踪库可用tail -f
相同rxjava-jdbc
:使用 RxJava 和到ResultSets
流的 JDBC 连接
安装 RxJava 框架
本节将介绍 Maven 的 RxJava 安装(Gradle、SBT、Ivy、Grape、Leiningen 或 Buildr 步骤相似)和 Java9 的 replJShell 的用法。
Maven 安装
安装 RxJava 框架很容易。JAR 依赖的文件和项目反应流在 Maven 可以在下面的页面中使用。
在你的pom.xml
这个包含在文件中 Maven 依赖项:
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.packt.java9</groupId> <artifactId>chaper6_client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.reactivex.
rxjava2/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.
reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.1</version>
</dependency>
</dependencies>
</project>
安装在 Gradle、SBT、Ivy、Grape、Leiningen 或 Buildr 中类似;查看这个页面了解需要添加到configuration
文件的更多信息。
JShell 安装
我们将在第 9 章“Java 最佳实践”中详细讨论 JShell,现在让我们从 RxJava 的角度来看一下。在 JShell 中安装 RxJava 框架是通过将 classpath 设置为 RxJava 和 reactive streams JAR 文件来完成的。请注意,Linux 上使用冒号,Windows 上使用分号作为文件路径分隔符:
"c:Program FilesJavajdk-9binjshell" --class-path D:Kitsrxjavarxjava-2.1.3.jar;D:Kitsrxjavareactive-streams-1.0.1.jar
屏幕上将显示以下错误:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-92RYJCXQ-1657721282484)(img/081f90ec-4fec-4cf3-bfe5-4793220a018d.png)]
前面的错误是因为我们忘记导入相关的 Java 类。
以下代码处理此错误:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wNIrKRWU-1657721282485)(img/fd64466a-2b93-43ce-9fe2-a4fc81f8dd59.png)]
现在我们已经成功地创建了我们的第一个可观察对象。在下面的部分中,我们将学习它的功能以及如何使用它。
可观察对象、可流动对象、观察者和订阅者
在 ReactiveX 中,观察者订阅一个可观察的对象。当观察者发射数据时,观察者通过消耗或转换数据做出反应。这种模式便于并发操作,因为它不需要在等待可观察对象发出对象时阻塞。相反,它以观察者的形式创建了一个哨兵,随时准备在以观察者的形式出现新数据时做出适当的反应。这个模型被称为反应堆模式。下图取自这个页面,解释了可观测数据流:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rI7dsGXh-1657721282486)(img/b33888d8-4408-4fd5-9215-3a609f5cea79.png)]
反应式的可观察性与祈使式的可观察性相似。它解决了相同的问题,但策略不同。一旦可用,Observable
通过异步推送更改来工作,而Iterable
则以同步方式拉送更改机制。处理错误的方法也不同;一种使用错误回调,而另一种使用副作用,例如抛出异常。下表显示了差异:
| 事件 | 可迭代对象 | 可观察对象 | | — | — | | 获取数据 | T next()
| onNext(T)
| | 错误 | throw new Exception
| onError(Exception)
| | 完成 | Return
| onCompleted()
|
使用订阅(onNextAction
、onErrorAction
、onCompletedAction
)方法将观察者连接到被观察者。观察者实现以下方法的一些子集(只有onNext
是必需的):
onNext
:每当被观察对象发出一个项目时调用,方法以被观察对象发出的项目作为参数onError
:调用它是为了表示它没有生成预期的数据或遇到了其他错误,并将异常/错误作为它的参数onCompleted
:当没有更多的数据要发出时调用
从设计的角度来看,反应式可观测对象通过使用onError
和onCompleted
回调来增加在完成和错误时发出信号的能力,从而增强了四人帮的观察者模式。
有两种类型的反应式观察结果:
- :即使没有连接用户,也会尽快开始发送。
- :在开始发送数据之前,等待至少一个订户连接,因此至少一个订户可以从一开始就看到序列。它们被称为“可连接的”可观察对象,RxJava 拥有能够创建此类可观察对象的操作符。
RxJava2.0 引入了一种新的可观察类型,称为Flowable
。新的io.reactivex.Flowable
是一个支持背压的基本反应类,而可观察的不再是。背压是一组策略,用于处理当可观察对象发出订户可以处理的更多数据时的情况。
RxJava Observable
应用于小数据集(最长不超过 1000 个元素),以防止OutOfMemoryError
或用于 GUI 事件,例如鼠标移动或小频率(1000 Hz 或以下)的触摸事件。
在处理超过 10000 个元素、从磁盘读取(解析)文件(这在背压下很好地工作)、通过 JDBC 从数据库读取数据或执行基于块和/或拉的数据读取时,将使用Flowable
。
创建可观察对象
以下操作符用于从现有对象、其他数据结构的数组或序列或计时器中从头开始创建可观察对象。
创建操作符
可以通过调用以下io.reactivex.Observable
方法之一(操作符)从头开始创建可观察对象:
- 创建
- 生成
- 不安全创建
下面的示例演示如何从头开始构造一个可观察的。调用onNext()
直到观察者没有被释放,onComplete()
和onError()
以编程方式获得 1 到 4 的数字范围:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ThKf4rud-1657721282486)(img/5d7d884e-4933-47c8-822e-433252cffe4d.png)]
正如我们在前面的屏幕截图中所看到的,输出与预期一样,范围从 1 到 4,序列在使用后会被处理掉。
延迟运算符
一旦观察者连接,可以通过调用defer
方法为每个观察者创建一个新的观察者。以下代码显示了defer
在我们提供号码时的用法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LdmVgfaQ-1657721282487)(img/d13e22a4-5612-4842-8604-45131c8b57e5.png)]
控制台println
方法输出 123,这是可观察的整数。
空运算符
可以通过调用empty()
或never() io.reactivex.Observable
方法来创建从不发送的空项目。
from
运算符
通过调用以下方法之一,可以从数组、Future
或其他对象和数据结构进行转换:
fromArray
:将数组转换为可观察数组fromCallable
:将提供值的Callable
转换为Observable
fromFuture
:将Future
提供的值转换为可观察的值fromIterable
:将Iterable
转换为Observable
fromPublisher
:将反应发布者流转换为可观察发布者流just
:将给定对象转换为可观察对象
下面的示例从字母列表(abc
)中创建一个Observable
:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fCSJCUCU-1657721282488)(img/3827aba1-f52f-4a66-a1ed-1c62e1f65eda.png)]
a
、b
和c
的整个数组被消耗,并通过System.out.println
方法打印到控制台。
区间运算符
通过使用interval
方法,可以创建一个可观察的对象,该对象发出一个由特定时间间隔间隔隔开的整数序列。下面的示例从不停止;它每秒钟连续打印一次记号号:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BKagf3IF-1657721282488)(img/fb9d71db-f693-4721-be43-73004eac1fa4.png)]
尝试停止计时器也无济于事(即使是Ctrl + C
,只要关闭窗口),它会继续按指令每隔一秒向控制台打印递增的数字。
定时器运算符
通过使用计时器方法,可以在给定的延迟之后发出单个项目。
范围运算符
可以使用以下方法创建序列号范围:
intervalRange
:发出一系列长值的信号,第一个在一些初始延迟之后,接下来是周期性的range
:发出指定范围内的整数序列
重复运算符
为了重复特定的项目或特定的顺序,请使用:
repeat
:重复给定可观测源发射的项目序列多次或永远(取决于输入)repeatUntil
:重复可观测源发出的项目序列,直到提供的stop
函数返回true
repeatWhen
:除了onComplete
之外,发出与初始可观察对象相同的值
以下代码重复给定的a
值,直到满足条件:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bDrD4mfD-1657721282489)(img/417d530b-771a-41b7-b2b6-b8fa3283219f.png)]
它向控制台重复三次a
,直到x
的值 3 大于 2。作为练习,将x++
替换为++x
并检查控制台。
转换可观测对象
这些运算符转换由可观察对象发出的项。
订阅操作符
这些是订户用来消耗来自可观察对象的发射和通知的方法,例如onNext
、onError
和onCompleted
。用于订阅的可观察方法有:
blockingForEach
:消耗此可观察对象发出的每个项目,并阻塞直到可观察对象完成。blockingSubscribe
:订阅当前线程上的可观察事件并消耗事件。forEachWhile
:订阅Observable
并接收每个元素的通知,直到onNext
谓词返回false
。forEach
:订阅可观察到的元素并接收每个元素的通知。subscribe
:将给定的观察者订阅到该可观察对象。观察器可以作为回调、观察器实现或抽象io.reactivex.subscribers.DefaultSubscriber<T>
类的子类型提供。
缓冲区运算符
buffer
方法用于创建给定大小的包,然后将它们打包为列表。下面的代码显示了如何在 10 个数字中创建两个bundle
,一个有 6 个,另一个有其余 4 个:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xvax2Bwa-1657721282490)(img/c4292f4c-a67d-44da-99b9-07359a89daa5.png)]
展开映射操作符
通过使用以下操作符之一,可以通过到达顺序(flatMap
)、保持最后发射的顺序(switchMap
)或通过保持原始顺序(concatMap
)将给定的可观察对象转换为单个可观察对象:concatMap
、concatMapDelayError
、concatMapEager
、concatMapEagerDelayError
、concatMapIterable
、flatMap
、flatMapIterable
、switchMap
,或switchMapDelayError
。下面的示例演示了如何通过随机选择可观察对象的顺序来更改输出的内容。(flatMap
、concatMap
、switchMap
:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5a7ELn9t-1657721282491)(img/c3fdb310-ec85-4469-86b6-329586a071bd.png)]
concatMap
实现将c
字符串附加到给定的a
、b
和c
字符串中的每一个,因此,输出是ac
、bc
和cc
。
flatMap
实现将f
字符串附加到给定的a
、b
和c
字符串中的每一个,如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PeMOYZtM-1657721282491)(img/96750fcd-8d66-48dc-b62d-dea37238d9bf.png)]
由于随机延迟,顺序与预期的af
、bf
、cf
不同,运行几次就会输出预期的顺序。
下面的代码段显示了不同的输出。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AmB5BdJs-1657721282491)(img/98226dc4-761c-43f3-a4cf-d70b57c77f86.png)]
switchMap
实现将s
字符串附加到给定的a
、b
和c
字符串列表中的最后一个元素。
注意advanceTimeBy
的用法。没有这个电话,什么都不会打印,因为发射被推迟了。
分组运算符
groupBy
用于将一个可观察对象划分为一组可观察对象,每个可观察对象发出一组不同的项目。下面的代码按起始字母对字符串进行分组,然后打印键和特定键的组数据。请注意,这些组是可观察的,可用于构造其他数据流。
以下输出按第一个字母显示组作为一个组,并显示组键(即第一个字母):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6TxWBqfd-1657721282492)(img/44c6e7ea-5442-4bf8-bea1-6c8f0e77e773.png)]
映射运算符
为每个项目应用一个函数来转换可观察对象可以通过以下方法实现:
cast
:将结果强制转换为给定类型map
:对每个发出的项目应用指定的函数
扫描运算符
利用积累的转换可以用scan
方法来完成。以下代码通过发出元素的当前和来使用它:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WO0qsS8y-1657721282492)(img/2430126e-d52b-4730-b9bb-a70032b24ff7.png)]
窗口操作符
window
方法用于周期性地将项目从一个可观察窗口细分为可观察窗口,并突发发射这些窗口。下面的代码显示,使用一个元素的窗口不起任何作用,同时使用三个元素输出它们的总和:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RwOERutY-1657721282493)(img/4d1158a4-c0ab-495a-a14c-f298825df7e3.png)]
过滤可观察对象
这些操作符根据给定的条件/约束从给定的可观察对象选择性地发射项。
去抖动算符
只能在经过特定时间跨度后发射,可以使用以下方法:
debounce
:镜像最初的可观察项,除了它删除源发出的项,然后在一段时间内删除另一项throttleWithTimeout
:仅发射那些在指定时间窗口内没有后跟另一个发射项的项
在下面的示例中,我们将删除在 100 毫秒的去抖动时间跨度过去之前触发的项;在我们的示例中,它只是最后一个管理的值。同样,通过使用测试调度器,我们提前了时间:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YoloRy13-1657721282494)(img/14796180-af89-486d-b2c6-af30de36f343.png)]
去重运算符
这将使用以下方法删除可观察对象发出的不同项:
distinct
:只发射不同的元素distinctUntilChanged
:仅发射与其直接前辈不同的元素
在下面的代码中,我们将看到如何使用distinct
方法从给定序列中删除重复项:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ivho3r5G-1657721282494)(img/04062498-1881-48c2-a36b-cafb2aa80c50.png)]
我们可以看到重复的aaa
字符串已经从输出中删除。
获取元素运算符
为了通过索引获得元素,使用elementAt
方法。以下代码打印列表中的第三个元素:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yjv0WNQf-1657721282495)(img/0d1cf753-0142-4ac3-bdb3-f27485bfaebc.png)]
过滤运算符
在以下方法上使用只允许从通过测试(谓词/类型测试)的可观察对象中发出那些项:
filter
:只发出满足指定谓词的元素ofType
:只发出指定类型的元素
以下代码显示了filter
方法的用法,用于过滤掉不以字母a
开头的元素:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VOVoGHtg-1657721282495)(img/b231376c-4dd4-4c41-a306-c5c36fdb8475.png)]
第一个/最后一个运算符
这些方法用于根据给定条件返回项目的第一个和最后一个匹配项。也有阻塞版本可用。可用的io.reactivex.Observable methods
是:
blockingFirst
:返回可观察对象发出的第一项blockingSingle
:返回可观察对象发出的第一个Single
项first
:返回可观察对象发出的第一项firstElement
:返回仅发射第一个项目的Maybe
single
:返回仅发射第一个项目的Single
singleElement
:返回一个只发出第一个单曲的Maybe
blockingLast
:返回可观察对象发出的最后一项last
:返回可观察对象发出的最后一项lastElement
:返回只发出最后一个单曲的Maybe
示例运算符
使用此运算符可发射特定项目(由采样时间段或节气门持续时间指定)。io.reactivex.Observable
提供以下方法:
sample
:在给定的时间段内发出最近发出的项目(如果有)throttleFirst
:仅发射给定连续时间窗口内发射的第一个项目throttleLast
:仅发射给定连续时间窗口内发射的最后一项
跳过运算符
从可观察的输出中删除第n
个倒数第n
个元素。以下代码显示了如何跳过给定输入的前三个元素:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VPHdC0bZ-1657721282495)(img/97de704a-97d6-44a5-9d1d-d5c670cc354c.png)]
调用skipLast
方法将只输出 1 和 2。
选取运算符
它只从给定的可见光发送第n
个倒数第n
个元素。以下示例显示如何仅从可观察的数值范围中获取前三个元素:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DmJs58Cw-1657721282496)(img/e8d8122d-3c2c-4d1e-bc61-ae5c9313715c.png)]
使用具有相同参数的takeLast
方法将输出 3、4 和 5。
组合可观察对象
这些运算符用于组合多个可观察对象。
联合运算符
通过调用以下方法之一,组合来自两个或多个可观测对象的最新发射值:
combineLatest
:发出聚合每个源的最新值的项withLatestFrom
:将给定的可观察对象合并到当前实例中
下面的示例(永远运行)显示了组合两个具有不同时间跨度的间隔可观察对象的结果—第一个每 6 毫秒发射一次,另一个每 10 毫秒发射一次:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OPZGHU8f-1657721282496)(img/5c5081da-a559-42b7-b9fe-33696c536ca3.png)]
前面代码的执行需要通过按Ctrl + C
停止,因为它创建了一个无限列表。输出与预期一样,它包含基于创建时间戳的两个序列的组合值。
连接运算符
通过调用以下方法之一,可以基于给定窗口组合两个可观察对象:
join
:使用聚合函数,根据重叠的持续时间,将两个可观察对象发出的项目连接起来groupJoin
:使用聚合函数,根据重叠的持续时间,将两个可观察对象发出的项目加入到组中
下面的示例使用join
组合两个可观察对象,一个每 100 毫秒触发一次,另一个每 160 毫秒触发一次,并每 55 毫秒从第一个值中获取一个值,每 85 毫秒从第二个值中获取一个值:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uPpPHYpI-1657721282497)(img/55cdf0f8-ce56-4452-a25d-5b04eaf29dc7.png)]
前面的代码永远执行,需要手动停止。
合并运算符
将多个可观察对象合并为一个可观察对象,所有给定的发射都可以通过调用:
merge
:将多个输入源展开为一个可观察源,无需任何转换mergeArray
:将作为数组给出的多个输入源展开为一个可观察源,而不进行任何转换mergeArrayDelayError
:将作为数组给出的多个输入源展开为一个可观察源,没有任何转换,也没有被错误打断mergeDelayError
:将多个输入源展开为一个可观察源,没有任何转换,也没有被错误打断mergeWith
:将这个和给定的源展开为一个可观察的,没有任何转换
在下面的示例中,我们将合并原始 1 到 5 范围的部分,合并方式是它包含所有条目,但顺序不同:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-INSCnaFz-1657721282497)(img/6e6af107-c6bc-4a15-be00-3023a00ad305.png)]
压缩运算符
基于组合器函数将多个可观察项组合成单个可观察项可以通过调用:
zip
:将指定的组合器函数的结果应用于给定可观测项所发射的多个项目的组合zipIterable
:发出一个指定的组合器函数的结果,该函数应用于给定的可观测项发出的多个项的组合zipWith
:发出一个指定的组合器函数的结果,该组合器函数应用于这个和给定的可观察对象的组合
下面的代码显示了如何基于字符串连接组合器将zip
应用于从 1 到 5 到 10 到 16(更多元素)的范围发出的元素。请注意,由于没有要应用的对应项,因此不会应用额外的发射(编号 16):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4yVR7s0O-1657721282497)(img/eb0fc961-36a5-4e0b-a200-3ac7f2b45424.png)]
错误处理
Observable
包含几个操作符,这些操作符允许错误处理、吞咽异常、转换异常、调用finally
块、重试失败的序列以及即使发生错误也可以处理资源。
捕获运算符
这些运算符可以通过继续执行以下顺序从错误中恢复:
onErrorResumeNext
:指示一个可观察对象将控制权传递给供应器提供的另一个可观察对象,而不是在出现问题时调用onError
onErrorReturn
:指示可观察对象发出函数提供的默认值,以防出现错误onErrorReturnItem
:指示可观察对象发出提供的缺省值,以防出现错误onExceptionResumeNext
:指示一个可观察对象将控制传递给另一个可观察对象,而不是在出现问题时调用onError
下面的示例演示如何使用onErrorReturnItem
方法;不使用flatMap
技巧调用它将停止流并在最后输出Default
。通过延迟对异常抛出代码的调用并对其应用onErrorReturnItem
,我们可以继续序列并使用提供的默认值:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HecmaufD-1657721282498)(img/672adc55-ebf3-4bb3-89de-850771d01fd4.png)]
do
运算符
这些用于注册对特定生命周期事件采取的操作。我们可以使用它们来模拟final
语句行为,释放分配给上游的资源,进行性能度量,或者执行不依赖于当前调用成功与否的其他任务。RxJava Observable
通过提供以下方法来实现这一点:
doFinally
:注册当前可观察对象调用onComplete
或onError
或被释放时要调用的动作doAfterTerminate
:在当前可观察对象调用onComplete
或onError
之后注册要调用的动作doOnDispose
:注册一个动作,在处理序列时调用doOnLifecycle
:根据序列的生命周期事件(订阅、取消、请求),为相应的onXXX
方法注册回调doOnTerminate
:注册当前可观察对象调用onComplete
或onError
时要调用的动作
以下代码段显示了前面提到的命令的用法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D88AtnUt-1657721282498)(img/56ecf8ec-7db3-4947-85b0-5801ba97a7b5.png)]
在前面的示例中,我们可以看到生命周期事件的顺序是:订阅、终止、完成或错误,最后通过在每个事件上注册控制台打印操作。
using
运算符
using
操作符在 Java 中有一个对应的操作符,名为资源尝试。它基本上也是这样做的,即创建一个在给定时间(当可观察对象被释放时)被释放的可支配资源。RxJava2.0 方法using
实现了这个行为。
重试运算符
这些是在发生可恢复的故障(例如服务暂时关闭)时要使用的操作符。他们通过重新订阅来工作,希望这次能顺利完成。可用的 RxJava 方法如下:
retry
:错误时永远重放同一流程,直到成功retryUntil
:重试,直到给定的stop
函数返回true
retryWhen
:基于接收错误/异常的重试逻辑函数,在错误情况下永远重放相同的流,直到成功为止
在下面的示例中,我们使用只包含两个值的zip
来创建重试逻辑,该逻辑在一个时间段后重试两次以运行失败的序列,或者用 500 乘以重试计数。当连接到无响应的 Web 服务时,尤其是从每次重试都会消耗设备电池的移动设备时,可以使用此方法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aXhV9JCk-1657721282499)(img/70d6545a-2809-4a40-8d32-b37e6e937ae9.png)]
调度器
在线程调度方面,可观测是不可知的——在多线程环境中,这是调度器的工作。一些操作符提供了可以将调度器作为参数的变体。有一些特定的调用允许从下游(使用操作符的点,这是observeOn
的情况)或不考虑调用位置(调用位置无关紧要,因为这是subscribeOn
方法的情况)观察流。在下面的示例中,我们将从上游和下游打印当前线程。注意,在subscribeOn
的情况下,线程总是相同的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5qza8GLT-1657721282499)(img/4acf684b-55fd-4938-845d-b40f900a1022.png)]
注意map
方法中的线程主要用法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y5mpwm36-1657721282499)(img/027fbe82-d16e-4588-99b5-8abae35211b6.png)]
请注意,map
方法不再使用线程main
。
RxJava2.0 提供了更多来自io.reactivex.schedulers.Schedulers
工厂的调度器,每个调度器都有特定的用途:
computation()
:返回用于计算工作的Scheduler
实例io()
:返回一个用于 I/O 工作的Scheduler
实例single()
:对于需要在同一后台线程上强顺序执行的工作,返回Scheduler
实例trampoline()
:返回一个Scheduler
实例,该实例在一个参与线程上以 FIFO 方式执行给定的工作newThread()
:返回一个Scheduler
实例,该实例为每个工作单元创建一个新线程from(Executor executor)
:将Executor
转换成新的Scheduler
实例,并将工作委托给它
有一个只用于特殊测试目的的Scheduler
,称为io.reactivex.schedulers.TestScheduler
。我们已经使用了它,因为它允许手动推进虚拟时间,因此非常适合于测试依赖于时间的流,而不必等待时间通过(例如,单元测试)。
主体
主体是可观察的和订户的混合体,因为它们都接收和发射事件。RxJava2.0 提供了五个主题:
AsyncSubject
:仅发射源可观测到的最后一个值,后跟一个完成BehaviorSubject
:发射最近发射的值,然后是可观测源发射的任何值PublishSubject
:仅向订阅方发送订阅时间之后源发送的项目ReplaySubject
:向任何订户发送源发出的所有项目,即使没有订阅UnicastSubject
:只允许单个用户在其生存期内订阅
示例项目
在下面的示例中,我们将展示 RxJava 在实时处理从多个传感器接收到的温度中的用法。传感器数据由 Spring 引导服务器提供(随机生成)。服务器配置为接受传感器名称作为配置,以便我们可以为每个实例更改它。我们将启动五个实例,并在客户端显示警告,如果其中一个传感器输出超过 80 摄氏度。
使用以下命令可以从 bash 轻松启动多个传感器:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TiNyK0FI-1657721282500)(img/2570a499-f35d-448f-92a0-e008531b9272.png)]
服务器端代码很简单,我们只配置了一个 REST 控制器,将传感器数据输出为 JSON,如下代码所示:
@RestController
publicclass SensorController
{
@Value("${sensor.name}")
private String sensorName;
@RequestMapping(value="/sensor", method=RequestMethod.GET,
produces=MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<SensorData> sensor() throws Exception
{
SensorData data = new SensorData(sensorName);
HttpHeaders headers = new HttpHeaders();
headers.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(new
ObjectMapper().writeValueAsString(data).length()));
returnnew ResponseEntity<SensorData>(data, headers,
HttpStatus.CREATED);
}
}
传感器数据是在SensorData
构造器中随机生成的(注意 Lombock 库的使用,以摆脱获取设置器代码):
@Data
publicclass SensorData
{
@JsonProperty
Double humidity;
@JsonProperty
Double temperature;
@JsonProperty
String sensorName;
public SensorData(String sensorName)
{
this.sensorName = sensorName;
humidity = Double.valueOf(20 + 80 * Math.random());
temperature = Double.valueOf(80 + 20 * Math.random());
}
}
现在我们已经启动了服务器,我们可以从支持 RxJava 的客户端连接到它。
客户端代码使用 rxapache http 库:
publicclass Main
{
@JsonIgnoreProperties(ignoreUnknown = true)
staticclass SensorTemperature
{
Double temperature;
String sensorName;
public Double getTemperature()
{
return temperature;
}
publicvoid setTemperature(Double temperature)
{
this.temperature = temperature;
}
public String getSensorName()
{
return sensorName;
}
publicvoid setSensorName(String sensorName)
{
this.sensorName = sensorName;
}
@Override
public String toString()
{
return sensorName + " temperature=" + temperature;
}
}
}
SensorTemperature
是我们的客户资料。它是服务器可以提供的内容的快照。其余信息将被 Jackson 数据绑定器忽略:
publicstaticvoid main(String[] args) throws Exception
{
final RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000)
.setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnPerRoute(20)
.setMaxConnTotal(50)
.build();
httpClient.start();
在前面的代码中,我们通过设置 TCP/IP 超时和允许的连接数来设置并启动 HTTP 客户端:
Observable.range(1, 5).map(x ->
Try.withCatch(() -> new URI("http", null, "127.0.0.1", 8080 + x, "/sensor", null, null), URISyntaxException.class).orElse(null))
.flatMap(address -> ObservableHttp.createRequest