nodejs流
什么是流
流是nodejs中很重要的一个概念,是用于在 Node.js 中处理流数据的抽象接口,它能以一种以高效的方式处理读/写文件、网络通信、或任何类型的端到端的信息交换。
在传统的方式中,处理文件时会将文件的所有内容读取到内存中,然后再处理,假如遇到大文件,这种处理方式不仅低效还可能影响服务器性能,而流能解决这类问题,流每次都读取一部分内容到内存中处理,而不是一次性读取所有内容,这在处理大文件时很有用。
流的类型
Node.js 中有四种基本的流类型:
Writable
: 可以写入数据的流(例如,fs.createWriteStream()
)。
Readable
: 可以从中读取数据的流(例如,fs.createReadStream()
)。
Duplex
:Readable
和Writable
的流(例如,net.Socket
)。
Transform
: 可以在写入和读取数据时修改或转换数据的Duplex
流(例如,zlib.createDeflate()
)。
许多 Node.js 核心模块提供了原生的流处理功能,最值得注意的有:
process.stdin
返回连接到 stdin 的流。
process.stdout
返回连接到 stdout 的流。
process.stderr
返回连接到 stderr 的流。
fs.createReadStream()
创建文件的可读流。
fs.createWriteStream()
创建到文件的可写流。
net.connect()
启动基于流的连接。
http.request()
返回 http.ClientRequest 类的实例,该实例是可写流。
zlib.createGzip()
使用 gzip(压缩算法)将数据压缩到流中。
zlib.createGunzip()
解压缩 gzip 流。
zlib.createDeflate()
使用 deflate(压缩算法)将数据压缩到流中。
zlib.createInflate()
解压缩 deflate 流。
可读流
读取大文件
以读取文件为例
const reader = fs.createReadStream("./data.txt");
这段代码通过
fs.createReadStream()
创建了一个可读流,然后将data.txt
的内容片段化地读取出来。两种模式
可读流有两种模式:流动和暂停。
当创建reader时,可读流处于暂停
pause
模式,此时不会进行读取操作,但可以通过以下方式之一切换到流动模式:- 添加
'data'
事件句柄。
- 调用
stream.resume()
方法。
- 调用
stream.pipe()
方法将数据发送到Writable
。
当可读流处于流动模式时,可以通过
reader.pause()
或者reader.unpipe()
来转换成暂停模式。// 自动转成流动模式,此时会将内容打印到屏幕上 reader.pipe(process.stdout); // 如果执行下面命令,则会使reader流转成暂停模式 reader.pause(); // 或者 reader.unpipe(process.stdout);
常用事件
可读流提供一些事件,其中最值得注意的有
- error
- data
reader.on("data", (chunk) => { console.log(chunk.toString()); } // 类似于 // reader.pipe(process.stdout);
可写流
写入大文件
下面我们实现一个大文件的写入操作
const reader = fs.createReadStream("./data.txt"); const writer = fs.createWriteStream("./data2.txt"); reader.pipe(writer);
这段代码首先创造可读流读取了
data.txt
(假设是一个超大的文件),然后将可读流通过管道流入可写流中。这种是将可读流的内容流入到可写流中,当然你也可以直接写入可写流。
const writer = fs.createWriteStream("./data2.txt"); writer.write("hello world\n"); writer.write("hello node\n"); writer.end("end");
事件
可写流最值得关心的事件有
- error 在写入或管道数据时发生错误时触发
- drain
'drain'
事件将在适合继续将数据写入流时触发。
- finish 在调用
stream.end()
方法之后,并且所有数据都已刷新到底层系统时触发
const reader = fs.createReadStream(data1); const writer = fs.createWriteStream("./data2.txt"); writer.on("drain", () => { // 每次写入chunk时触发 // 如果内容体积太小,可以一次性写入,则不会触发该事件 console.log("drain"); }); writer.on("finish", () => { // 最后一次写入chunk时触发 console.log("finish"); }); reader.pipe(writer);
双工流和转换流
双工流是实现Readable和Writable的流,实际上就是实现
_read
和_write
的类,既能当做可读流又能当做可写流。转换流则是在双工流的基础上增加了transform方法,转换流会将输入经过transform后转换成输出。
Nodejs中的的压缩流和加密流都载内部实现了转换流。下面将以压缩流来演示转换流的工作方式。
const zlib = require("zlib"); const fs = require("fs"); fs.createReadStream("./data.txt") .pipe(zlib.createGzip()) .pipe(fs.createWriteStream("./test.txt.gz"));
通过fs来创建了可读流和可写流,目的是将
data.txt
内容读取出来然后保存到test.txt.gz
文件中,而在流的流动过程中会经历转换流zlib.createGzip()
,它的作用是将可读流的内容进行转换,并提供给后面的可写流。流的API
pipe()
readable.pipe()
方法将 Writable
流绑定到 readable
,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable
。 数据流将被自动管理,以便目标 Writable
流不会被更快的 Readable
流漫过。注意事项如果Readable
流在处理过程中触发错误,则Writable
目标不会自动关闭。 如果发生错误,则需要手动关闭每个流以防止内存泄漏(通过this.destroy())。
unpipe()
reader.pipe(writer); reader.unpipe(writer);
destroy()
在流中触发的异常,并不会传播到
error
事件中,也不会自动关闭释放资源,但出现异常时,需要手动执行this.destory(err)
来销毁流。销毁流 可选地触发
'error'
事件,并且触发 'close'
事件(除非 emitClose
设置为 false
)。 在此调用之后,可读流将释放任何内部资源,随后对 push()
的调用将被忽略。所有流中出现的异常都必须调用
this.destroy
进行捕获,否则可能会导致内存泄露const myReadable = new Readable({ read(size) { fs.open("./xxx", (err) => { if (err) { this.destroy(err); } }); }, }); myReadable.on("error", (err) => { console.log("err", err); }); myReadable.on("close", () => { console.log("close"); }); myReadable.pipe(process.stdout);
_construct()
_construct
是流的一个内部方法,是可选的。_construct
方法接收一个参数callback
,当实现了_construct
方法时,_write()
、_final()
、_read()
和 _destroy()
等操作的调用都会被延迟,直到执行了callback
。callback
可以传递一个Error类型的参数。const myWritable = new Writable({ construct(callback) { try { this.fd = fs.openSync("./input.txt", "a+"); callback(); } catch (err) { callback(err); } }, write(chunk, encoding, callback) { fs.write(this.fd, chunk, (err) => { if (err) this.destroy(err); callback(); }); }, }); fs.createReadStream("./data.txt").pipe(myWritable);
push()
Readable上的方法,添加chunk到可读流中,可选指定encoding。
const myReadable = new Readable({ read(size) {}, }); myReadable.push("123\n"); myReadable.push("123\n"); myReadable.push(null); myReadable.push("123"); // 报错,不能在EOF后push新的数据 myReadable.pipe(process.stdout);
当
chunk
作为 null
传递时,表示流结束 (EOF),之后不能再写入数据。实现自定义流
除了使用nodejs内置的流外,我们还可以自己实现流。
实现流的方式
实现流有三种方式,分别是通过类、构造函数或对象模式来实现流。
const {Readable} = require('stream') // 类 class MyReadable extends Readable{ constructor(options){ super(options) } _read(){} } // 构造函数 const myReadable = new Readable() myReadable._read = function(){} // 对象模式 const myReadable = new Readable({ read(){} })
下面将统一采用对象模式或类的形式实现流。
实现可读流
实现可读流必须要实现
_read()
方法,以从底层资源中获取数据const myReadable = new Readable({ read(size){} })
调用
readable.read()
时,如果资源中的数据可用,则实现应开始使用 this.push(dataChunk)
方法将该数据推送到读取队列中。 一旦流准备好接受更多数据,则 _read()
将在每次调用 this.push(dataChunk)
后再次调用。 _read()
可能会继续从资源中读取并推送数据,直到 readable.push()
返回 false
。 只有当 _read()
停止后再次被调用时,它才能继续将额外的数据推入队列。size
参数是可选的,它的作用是确定每次push
的数据量。实现可写流
和可读流类似,可写流必须实现
_write
()方法 或/和 writev()
。const myWritable = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); }, }); process.stdin.pipe(myWritable);
writable
会持续传入片段数据(chunk
)并执行writable.write()
。 如果流实现能够同时处理多个数据块,则应实现 writable._writev()
方法。encoding表示chunk的编码,如果块是字符串,则
encoding
是该字符串的字符编码。 如果块是 Buffer
,或者如果流在对象模式下运行,则可以忽略 encoding
。如果需要监听
'drain'
事件,则必须执行callback函数,按照nodejs风格,第一个参数传Error,如果没有异常则传null。实现类似
fs.createWriteStream
的效果。const myWritable = new Writable({ write(chunk, encoding, callback) { fs.writeFile("./input.txt", chunk, (err) => { callback(err, chunk); }); }, }); process.stdin.pipe(myWritable);
实现双工流
双工流实际上就是实现
readabale
和writable
的流,因此我们需要实现duplex._read
和duplex._write
。const { Duplex } = require("stream"); class MyDuplex extends Duplex { _read(chunk) {} _write(chunk) { this.push(chunk); } } const myDuplex = new MyDuplex(); process.stdin.pipe(myDuplex).pipe(process.stdout);
process.stdin
获取输入流,然后通过pipe
流到myDuplex
中,此时myDuplex
是作为可写流,在这个过程中MyDuplex._write
会被多次调用来片段化地写入数据,我们在这里通过this.push()
将片段数据chunk
添加,然后myDuplex
就可以作为可读流来讲内容流入到可写流process.stdout
,即屏幕上。实现转换流
转换流在双工流的基础上实现了transform,它能够转换内容,例如前面讲过的压缩流。
const zlib = require("zlib"); const fs = require("fs"); fs.createReadStream("./data.txt") .pipe(zlib.createGzip()) // 转换 .pipe(fs.createWriteStream("./test.txt.gz"));
我们可以实现一个简单的转换流,将文本每一行内容的前面都加上行号
let count = 0; const myTransform = new Transform({ transform(chunk) { this.push(`[${count}] ` + chunk); }, }); fs.createReadStream("./data.txt").pipe(myTransform).pipe(process.stdout);