2.8 Stream
Stream模块为Node操作流式数据提供了支持。
Stream的思想最早见于早期的UNIX,在UNIX中使用“|”符号会创建一个匿名管道,其本质上也是一个Stream,用于两个程序(或者是设备)之间的数据传输。
2.8.1 Stream的种类
要使用Node的stream模块,需要增加引用:
在Nodejs中,一共有四种基础的stream类型:
- Readable:可读流(for example fs.createReadStream())。
- Writable:可写流(for example fs.createWriteStream())。
- Duplex:既可读,又可写(for example net.Socket)。
- Transform:操作写入的数据,然后读取结果,通常用于输入数据和输出数据不要求匹配的场景,例如zlib.createDeflate()。
我们重点介绍Readable和Writable这两种stream。
1.Readable Stream
Readable Stream定义的方法和事件如下所示:
- Event: 'close'
- Event: 'data'
- Event: 'end'
- Event: 'error'
- Event: 'readable'
- readable.isPaused()
- readable.pause()
- readable.pipe(destination[, options])
- readable.read([size])
- readable.resume()
- readable.setEncoding(encoding)
- readable.unpipe([destination])
- readable.unshift(chunk)
- readable.wrap(stream)
代码2.21 Readable stream的例子
2.Writeable Stream
Writeable Stream主要使用write方法来写入数据,API列表如下文所示:
write方法同样是异步的,假设我们创建一个可读流读取一个较大的文件,再调用pipe方法将数据通过一个可写流写入另一个位置。如果读取的速度大于写入的速度,那么Node将会在内存中缓存这些数据。
当然缓冲区也是有大小限制的(state.highWatermark),当达到阈值后,write方法会返回false,可读流也进入暂停状态,当writeable stream将缓冲区清空之后,会触发drain事件,上游的readable重新开始读取数据。
另一个比较重要的是pipe方法,其声明如下:
pipe方法相当于在可读流和可写流之间架起了桥梁,使得数据可以通过管道由可读流进入可写流。下面是使用pipe方法改写的静态文件服务器。
代码2.22 使用pipe改写的静态文件服务器
pipe方法接收一个writable对象,当readable对象调用pipe方法时,会在内部调用writable对象的write方法进行写入。
2.8.2 ReadLine
ReadLine是一个Node原生模块,该模块比较不起眼,提供了按行读取Stream中数据的功能。
下面是ReadLine模块的监听事件及方法:
- Event : 'close'
- Event : 'line'
- Event : 'pause'
- Event : 'resume'
- Event : 'SIGCONT'
- Event : 'SIGINT'
- Event : 'SIGTSTP'
- rl.close()
- rl.pause()
- rl.prompt([preserveCursor])
- rl.question(query, callback)
- rl.resume()
- rl.setPrompt(prompt)
- rl.write(data[, key])
该模块通常用来和stream搭配使用,但因为在实际项目中通常会定制自己的stream或者自定义读取方法,导致该模块的地位有些尴尬。下面是readLine的一个例子。
代码2.23 使用readLine模块读取文件
readLine并没有提供形如new readline()形式的构造方法,而是使用createInterface方法初始化了一个rl对象。
想象下有如下场景,一个可读流中包含了很多条独立的信息需要逐条处理,这可能是一个消息队列,这时使用readline模块就比较方便。
2.8.3 自定义Stream
在实际开发中,如果想要使用流式API,而原生的Stream又不能满足需求时,可以考虑实现自己的Stream类,常用的方法是继承原生的Stream类,然后做一些扩展。
下面我们拿Readable Stream为例来说明如何实现一个自定义的Stream。
上面的代码实现了名为MyReadable的类,它继承自Readable类,并且接受一个数组作为参数。
想要继承Readable类,就要在自定义的类内部实现_read方法,该方法内部使用push方法往可读流添加数据。
当我们给可读流对象注册data事件后,可读流会在nextTick中调用_read方法,并触发第一次data事件(读者可能会认为可读流开始读取是在调用构造函数之后,但此时data事件还未注册,可能会捕获不到最初的事件,因此可读流开始产生数据的操作是放在nextTick中的)。
当有消费者从readable中取数据时会自动调用该方法。在上面的例子里我们在_read方法里调用了push方法,该方法用来向可读流中填充数据,下面是一个消费者的例子:
每次触发data事件时都会得到相应的数组元素,当数组为空时,_read方法会被调用。即:
如果end事件被触发,则代表读取完毕。