新时期的Node.js入门
上QQ阅读APP看书,第一时间看更新

2.8 Stream

Stream模块为Node操作流式数据提供了支持。

Stream的思想最早见于早期的UNIX,在UNIX中使用“|”符号会创建一个匿名管道,其本质上也是一个Stream,用于两个程序(或者是设备)之间的数据传输。

2.8.1 Stream的种类

要使用Node的stream模块,需要增加引用:

在Nodejs中,一共有四种基础的stream类型:

  • Readable:可读流(for example fs.createReadStream())。
  • Writable:可写流(for example fs.createWriteStream())。
  • Duplex:既可读,又可写(for example net.Socket)。
  • Transform:操作写入的数据,然后读取结果,通常用于输入数据和输出数据不要求匹配的场景,例如zlib.createDeflate()。

我们重点介绍Readable和Writable这两种stream。

1.Readable Stream

Readable Stream定义的方法和事件如下所示:

  • Event: 'close'
  • Event: 'data'
  • Event: 'end'
  • Event: 'error'
  • Event: 'readable'
  • readable.isPaused()
  • readable.pause()
  • readable.pipe(destination[, options])
  • readable.read([size])
  • readable.resume()
  • readable.setEncoding(encoding)
  • readable.unpipe([destination])
  • readable.unshift(chunk)
  • readable.wrap(stream)

代码2.21 Readable stream的例子

2.Writeable Stream

Writeable Stream主要使用write方法来写入数据,API列表如下文所示:

write方法同样是异步的,假设我们创建一个可读流读取一个较大的文件,再调用pipe方法将数据通过一个可写流写入另一个位置。如果读取的速度大于写入的速度,那么Node将会在内存中缓存这些数据。

当然缓冲区也是有大小限制的(state.highWatermark),当达到阈值后,write方法会返回false,可读流也进入暂停状态,当writeable stream将缓冲区清空之后,会触发drain事件,上游的readable重新开始读取数据。

另一个比较重要的是pipe方法,其声明如下:

pipe方法相当于在可读流和可写流之间架起了桥梁,使得数据可以通过管道由可读流进入可写流。下面是使用pipe方法改写的静态文件服务器。

代码2.22 使用pipe改写的静态文件服务器

pipe方法接收一个writable对象,当readable对象调用pipe方法时,会在内部调用writable对象的write方法进行写入。

2.8.2 ReadLine

ReadLine是一个Node原生模块,该模块比较不起眼,提供了按行读取Stream中数据的功能。

下面是ReadLine模块的监听事件及方法:

  • Event : 'close'
  • Event : 'line'
  • Event : 'pause'
  • Event : 'resume'
  • Event : 'SIGCONT'
  • Event : 'SIGINT'
  • Event : 'SIGTSTP'
  • rl.close()
  • rl.pause()
  • rl.prompt([preserveCursor])
  • rl.question(query, callback)
  • rl.resume()
  • rl.setPrompt(prompt)
  • rl.write(data[, key])

该模块通常用来和stream搭配使用,但因为在实际项目中通常会定制自己的stream或者自定义读取方法,导致该模块的地位有些尴尬。下面是readLine的一个例子。

代码2.23 使用readLine模块读取文件

readLine并没有提供形如new readline()形式的构造方法,而是使用createInterface方法初始化了一个rl对象。

想象下有如下场景,一个可读流中包含了很多条独立的信息需要逐条处理,这可能是一个消息队列,这时使用readline模块就比较方便。

2.8.3 自定义Stream

在实际开发中,如果想要使用流式API,而原生的Stream又不能满足需求时,可以考虑实现自己的Stream类,常用的方法是继承原生的Stream类,然后做一些扩展。

下面我们拿Readable Stream为例来说明如何实现一个自定义的Stream。

上面的代码实现了名为MyReadable的类,它继承自Readable类,并且接受一个数组作为参数。

想要继承Readable类,就要在自定义的类内部实现_read方法,该方法内部使用push方法往可读流添加数据。

当我们给可读流对象注册data事件后,可读流会在nextTick中调用_read方法,并触发第一次data事件(读者可能会认为可读流开始读取是在调用构造函数之后,但此时data事件还未注册,可能会捕获不到最初的事件,因此可读流开始产生数据的操作是放在nextTick中的)。

当有消费者从readable中取数据时会自动调用该方法。在上面的例子里我们在_read方法里调用了push方法,该方法用来向可读流中填充数据,下面是一个消费者的例子:

每次触发data事件时都会得到相应的数组元素,当数组为空时,_read方法会被调用。即:

如果end事件被触发,则代表读取完毕。