Java编程方法论:响应式RxJava与代码设计实战
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

1.4 响应式开发工具库

已经有很多工具库实现了响应式流的标准,包括Akka、Reactor、RxJava、Streams、Vert.x等。下面简单介绍几种,在后面的章节中我会重点讲解RxJava(关于Reactor,会在本系列丛书的另一本书中具体讲解)。

1.4.1 RxJava简介

通过官方GitHub可知,RxJava是使用Java语言开发的专门针对JVM的一种响应式扩展工具,通过它可以轻松地在服务器端实现并发操作。RxJava的目的就是处理客户端越来越复杂的请求,在服务器端通过并行计算快速地响应请求。

接下来,我们开始了解RxJava到底是怎么一回事,作为数据的消费者,我们会对获得的数据做出各种反应。有句话说得好,“跳出三界外,不在五行中”,在我们以旁观者的视角来看这件事的时候,其实它就是一个观察者模式的实际体现。RxJava下的响应式编程其实就是基于影院里的电影(Observable)提供的内容(生产者数据)传播给订阅者,然后订阅者做出相应的反应。

结合上面的场景,下面对RxJava所涉及的要点进行解释。

● Observable:表示数据源,Observable会发出一定数量的元素,发送可能会成功,也可能在这个过程中出现状况而失败。从电影院的场景可以知道,同一时间Observable可以有多个订阅者。

● Observer或Subscriber:表示订阅者,通过监听Observable来消费Observable所发送的元素。

● Methods:表示一系列操作API,对下发数据进行加工整合。

● onNext:在一个元素被Observable发送出去的时候,通过该方法可以调用每一个订阅者。

● onComplete:在Observable成功发送完所有数据后,会调用这个方法来收尾。

● onError:当Observable发送数据的过程中出现错误的状况时,会调用这个方法结束发送并返回一个error事件。

RxJava所带来的好处主要如下。

● 允许我们进行一系列的异步操作。

● 有时为了跟踪状态,我们会通过一个原子类变量保存之前计算的值。我们无须专门使用原子类来跟踪状态,因为RxJava中已经封装了这些操作。

● RxJava提供了一个在整个执行过程中发生错误时的处理途径。

1.4.2 Reactor简介

Spring 5官方文档提到,其通过Reactor的支持,在服务器端获得了更高的性能和更快的响应速度。Spring WebFlux作为新一代的Web开发框架,以Reactor作为基础框架进行异步编程的开发,从而可以使我们写出性能更好的Web应用程序。如果大家看过我博客中关于Spring 5源码分析的系列文章,就可以知道Spring MVC框架的整个运行过程其实使用了事件驱动(Event-Driven)模式,而Reactor自身的设计也使用了这个模式。参考前面的电影院场景,当电影的画面和声音传到你的眼睛和耳朵中时,会引发你喜怒哀乐的情绪反应。再形象一点,在拳击选手一个直拳要打到对方选手脸上的时候,对方选手会躲闪。电影的画面和拳击选手的直拳动作都是一种事件的表现,根据事件做出相应处理的整个过程就是所谓的事件驱动。在Spring里,这就相当于我们的后台服务器接收事件请求,通过multicaster多路分发器来分发给相应的监听器(Listener),最后由监听器里定义的相应的handler来做具体处理。整个过程大概就是这样的。

知道了上面的这些内容,通过使用Reactor,我们写出的程序就可以按照事件驱动的模式很轻易地异步运行了。

1. Reactor的优点

Reactor支持完全无阻塞,其主要的目标之一就是解决传统Web开发方案对于异步支持的各种弊病。它提供了十分有效的途径来支持背压。它还有以下优点。

● 丰富的API,可以对数据流进行操作。

● 提供了一种可读性更强的代码书写方式,使我们所写的代码可以更方便地得到维护。

● 与流相同,无消费,不执行。

● 消费者具备发信号通知生产者元素按需下发的能力(RxJava同样具备)。

2. Reactor的核心功能

Reactor项目的主要模块是reactor-core,这是一个专门用于支持响应式流规范的类库,其支持Java 8及后续版本。通过查看Reactor API,可知它和RxJava很像。Reactor 3是Reactor 2和RxJava的核心贡献开发者一起完成的一个混合版本,这也是本章把这几个东西放在一起介绍,然后分章讲解的很重要的原因,因为这样更易于理解。

Reactor与RxJava有相同的Publisher、Subscriber、Subscription和Processor核心接口。这里只简单介绍Publisher最常用的两个实现Mono和Flux,以及相关的操作符。

● Mono:表示一个特殊的Publisher,它可以发送0个或1个元素。

● Flux:表示一个特殊的Publisher,它可以发送0到n个元素。

● 操作符:元素在从Publisher发送给订阅者之前,可能会需要进行一些处理,包括转换、过滤操作等。

1.4.3 MongoDB简介

在后面的实战开发中,我们可能会用到MongoDB,官方提供的MongoDB Reactive Streams Java版本的驱动包API可以对MongoDB进行异步流处理,而且是无阻塞支持背压的。这里只是提一下,说明我们的数据库操作层面也开始做到了对响应式流API标准的支持。

1.4.4 响应式项目用例

前面说了那么多,大家可能依然有点不明白,那么为了更好地理解响应式系统(Reactive System),我们看看它与传统项目的不同之处。

以我们生活中的股票场景为例,我们需要看到股票信息的实时动态展示。这时我们会打开并保持一个页面,这个页面可以实时显示股票信息。开发人员需要做的是,将最新的数据更新到这个股票展示页面上。作为股民,面对的是“差之毫厘,失之千里”的局面。对他们来说,数据刷新得越及时,对决策越有利(在这里,我们只从响应式的角度来考虑这个问题,现实项目中会有基于WebSocket的实现,Spring MVC中也有SSE的实现)。

1. 传统开发模式

根据以往的开发经验,我们会主动地检查股票价格有没有变化,如果有变化,就从后台拉取最新的数据。如图1-1所示的流程图就代表着传统开发模式。

图1-1

在传统开发模式下,一旦开始渲染访问页面,就会每隔一段时间(图1-1中是1分钟)发送AJAX请求到后台的查询服务去请求股票价格数据。使用这种方式,无论股票价格是否真的发生变化,都会去请求,但无法保证股票价格的变化会被立刻传递到Web页面上。

2. 响应式开发模式

响应式开发模式通过事件驱动的方式将各个组件连接到一起,以实现在事件发生时其他组件可以立即进行响应。

也就是说,在加载股票价格页面后,这个页面会有一个专属ID注册到股票查询服务上。一旦使股票价格发生变化的事件产生,这个事件(Event)就会触发响应,最新的股票价格就会在Web页面上进行更新显示。如图1-2所示说明了整个流程。

图1-2

可以看到,响应式开发模式一般包括下面3个步骤。

(1)订阅(Subscribing)事件。

(2)事件的发生与传播。

(3)解除订阅。

在股票价格页面初始化加载的时候,其中有一个动作就是订阅当前使股票价格发生变化的事件源,可以认为事件源是消息中间件里的主题(Topic)(或者是我们订阅的一个RSS主题),而不同的响应式框架会有不同的具体方式,使用消息中间件也可以实现订阅。

在我们所关注的某只股票价格发生变化的时候,一个新的事件就会产生并分发给这个事件的订阅者。我们的Web页面会及时地接收并更新股票价格数据。而一旦Web页面关闭或者刷新,一个解除订阅的请求就会被发送至后台。

3. 传统开发模式和响应式开发模式的比较

可以看到,传统开发模式是比较简单的,而响应式开发模式需要我们实现一个订阅和事件传播链。如果事件的传播需要跨项目,也就是涉及其他项目,那么就可能会使用到消息中间件,这将会变得复杂,其并不属于本书的范围,此处不做讨论。

在传统开发模式中,更新股票价格页面主要是基于盲目的主动拉取来实现的,前端根本就不知道会不会有数据发生变化。这也就意味着无论后台数据有没有发生变化,前端都需要定时从后台拉取一次数据。而在响应式开发模式中,一旦注册订阅了价格变动事件,那么只有这只股票的价格发生变化才会触发一系列的操作,这样明显提高了程序性能和用户体验。

在传统开发模式中,这个例子中线程的生命周期会比较长,这也就意味着该线程所使用的资源在这个过程中会被线程锁定。考虑到同一时刻服务器会接收大量的请求,这样势必会造成更多的线程相互争夺资源。在响应式开发模式中,线程生存的时间短,这也就意味着争夺资源的情况较少,后面在本书中会针对类似场景进行相应的实战Demo展示。