最近正好处于一个相对悠闲的当口,想到要整理一下关于Node的一点知识。首先本人项目中并没有用过node,但是有时间就会关注一下学习一点。短短续续的学习导致的问题就是知识没有连贯性,由于不注意总结,本来之前花力气深入了解的知识过段时间又忘了。
痛定思痛,宁愿在烂笔头上花点时间,巩固记忆同时也便于回溯。

以下是我看过的一些比较好的相关文章:

nodejs stream官方文档

stream-handbook英文版
stream-handbook中文版
Nodejs Stream

不扯淡了,主题如题,就是对Node中Stream知识点的一个总结。还是我自己的老习惯,先列个知识点图谱,再来一项一项的展开:

  • 流是什么
  • 流的种类
  • 自定义流
  • 流的工作模式

在理解这个问题前我们首先要知道 流能干什么,解决了什么问题

// node中的IO会涉及到对文件的读写,以下代码中用户在接收到内容之前首先需要等待程序将文件
// 内容完全读入到内存中,造成的问题就是如果文件很大,不但消耗资源而且使用户连接缓慢,
// 影响用户体验
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/large-data.txt', function (err, data) {
        res.end(data);
server.listen(8000);

流能干什么?其实就是解决上述的问题:将数据分割成段,一段一段的读取,占用内存更小而且效率更高。

已上我们引出流(stream)的定义:流是对输入输出设备的抽象,数据源可理解为可读流,数据目的地可以理解为可写流。

调用node关于流的api对上面代码的改造就是:

var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/large-data.txt');
    stream.pipe(res);
server.listen(8000);
  1. 可读流 readable streams
  2. 可写流 writable streams
  3. 双向流 包括transform以及duplex

篇幅有限,这篇文章中先只整理可读流与可写流。

可读流栗子

Readable streams produce data that can be fed into a writable, transform, or duplex stream by calling .pipe()

var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop

自定义可读流 (来自之前看的文章,我觉得很形象)

在node中自定义可读流的要点有两个:

  • 继承 sream 模块的 Readable 类
  • 重写 _read 方法
const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
    constructor(max) {
        super()
        this.max = max;
    _read() {
        const ctx = this;
        setTimeout(() => {
            if (ctx.max) {
                const randomNumber = parseInt(Math.random() * 10000);
                // 1) 只能 push 字符串或 Buffer,不能是数字
                // 2) push(null)结束推送
                // 3when .push() to a readable stream, the chunks pushed are buffered until a consumer is ready to read them.
                ctx.push(`${randomNumber}\n`);
                ctx.max -= 1;
            } else {
                ctx.push(null);
        }, 100);
module.exports = RandomNumberStream;
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream();
rns.pipe(process.stdout); //输出(打印)到控制台

流的工作模式

流有两种工作模式: 流动模式 暂停模式

当处于流动模式时,数据由底层系统读出,并尽可能快地提供给程序;当处于暂停模式时,必须显式调用 stream.read() 来取出若干数据块。流默认处于暂停模式。

两种模式可以互相切换:

暂停模式 >> 流动模式:

  1. 监听’data’事件。
  2. 调用resume()方法。
  3. 调用pipe()方法将数据转接到另一个可写流。 (这也正是为什么在上面自定义可读流的_read方法中用了setTimeout而非setInterval,却能够源源不断地读取数据)

流动模式 >> 暂停模式:

  1. 如果调用了pipe(),移除data事件的监听,再调用unpipe()。
  2. 没有调用过pipe(),直接显式调用pause()方法。

暂停模式下的read()

When data is available, the ‘readable’ event fires and you can call .read() to fetch some data from the buffer.
When the stream is finished, .read() returns null because there are no more bytes to fetch.

readable event 解决问题就是读操作在可读流数据available的时候触发,避免客户读到空数据或者轮询可读流是否ready

const rns = new RandomNumberStream(5);
rns.on('readable', () => {
  let chunk;
  while((chunk = rns.read()) !== null){
    console.log(chunk);
 

可写流栗子

A writable stream is a stream you can .pipe() to but not from

使用方法主要是在可读流的data事件回调函数中调用writeStream.write方法

const fs = require('fs');
const readStream = fs.createReadStream('./source.js');
const writeStream = fs.createWriteStream('./dis.js');
readStream.setEncoding('utf-8');
readStream.on('data', chunk => {
  writeStream.write(chunk);
 

自定义可写流

  • 继承 stream 模块的 Writable 类
  • 重写 _write() 方法

write() 方法要点

  • highWaterMark 每次最多写入缓存区的数据量,默认值为 16kb
  • pipe的背压反馈机制(back pressure):

In unix, streams are implemented by the shell with | pipes. In node, the built-in stream module is used by the core libraries and can also be used by user-space modules. Similar to unix, the node stream module’s primary composition operator is called .pipe() and you get a backpressure mechanism for free to throttle writes for slow consumers.
Using .pipe() has other benefits too, like handling backpressure automatically so that node won’t buffer chunks into memory needlessly when the remote client is on a really slow or high-latency connection.

pipe背压反馈机制的原理: ‘drain’ event

pipe方法每次写数据的时候,都会判断是否写成功,如果失败,会等待可写流触发”drain”事件,表示可写流可以继续写数据了,然后pipe才会继续写数据。

所以以下代码在写入巨量数据的时候可能会发生内存泄露问题:

const rs = fs.createReadStream(filename);
rs.on('data', function (chunk) {
        res.write(chunk);
rs.on('end', function () {
        res.end();

改进方法如下:

const rs = fs.createReadStream(filename);
rs.pipe(res)

如果不用pipe的话,我们也可以利用“drain”事件的机制来手动控制读写流之间的平衡来避免内存泄漏:

const http = require("http");
const fs = require("fs");
const filename = "large-file.txt";
const serv = http.createServer(function (req, res) {
    const stat = fs.statSync(filename);
    res.writeHeader(200, {"Content-Length": stat.size});
    const readStream = fs.createReadStream(filename);
    readStream.on('data', function (chunk) {
        if(!res.write(chunk)){//判断写缓冲区是否写满
            readStream.pause();//如果写缓冲区已满,暂停读取数据
    readStream.on('end', function () {
        res.end();
    res.on("drain", function () {//写缓冲区一度堵塞,经过一段时间写后缓冲区恢复可写状态,触发"drain"事件
        readStream.resume();//重新启动读取数据
serv.listen(8888);

先暂时到这里,关于双工流Duplex和Transform下篇文章再总结。

node-memorystream - 该模块允许在内存中创建。 它可用于模拟文件、过滤/改变一个和另一个之间的数据、缓冲传入数据、作为可变速率的两个数据/网络之间的间隙等。 MemoryStream 支持读/写状态或仅读状态或仅写状态。 API 旨在遵循节点的 Stream 实现。 模块现在支持节点 > 0.10 的。 原始模块在这里 git://github.com/ollym/memstream.git 被重新制作和改进。 如果你安装了 npm,你可以简单地输入: npm install memorystream 或者您可以使用 git 命令克隆此存储库: git clone git://github.com/JSBizon/node-memorystream.git 一些如何使用内存模块的示例。 基本输入/输出操作 在这个例子中,我说明了内 探索Node.js实时媒体: kyriesent的node-rtsp-stream 项目地址:https://gitcode.com/kyriesent/node-rtsp-stream 在数字时代,实时媒体已成为我们日常生活的一部分,无论是在线会议、游戏直播还是远程教育。今天,我们将深入讨论一个开源项目——kyriesent/node-rtsp-stream,这是一个基于Node.js的库,... 我们先创建一个2.txt要写入的文件 let ws=fs.createWriteStream('./2.txt'); 文件不存储会创建 如果存在会先清空文件类容,再将文件写入第一个参数,写入路径第二个最高水位线 默认16K默认是utf8格... 在_read方法中,可以同步调用push(data),也可以异步调用。在上面的例子中,可读中的数据(0, 1)与可写中的数据(‘a’, ‘b’)是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。中维护了一个缓存,当缓存中的数据足够多时,调用read()不会引起_read()的调用,即不需要向底层请求数据。所以,如果_read异步调用push时发现缓存为空,则意味着当前数据是下一个需要的数据,且不会被read方法输出,应当在push方法中立即以data事件输出。     nodejs中大量的api与有关,曾经看到公司的一些大神的node代码,实现一个接口只需要pipe一下另一个java接口就可以了。简单的一行代码实在让人困惑。作为小白的自己一脸懵逼却又不敢问,因为根本不知道从何问起。现在终于通过学习,也能对说出个123,希望和大家共同交。     分为缓冲模式和对象模式,缓冲模式只能处理buffer或字符串,对象模式可以处理js对象。... 与Buffer的读/写操作类似,Stream中的可读可写也用于读/写操作。 使用文件进行文件复制,首先需要创建一个可读可读可以让用户在源文件中分块读取文件中的数据,然后再从可读中读取数据。在Node.js中,创建可读的语法如下: fs.createReadStream(path[, options]) 在上述语法中,path代表文件路径,options是一组key-va... Node.js操作按需数据使用sream API接口,stream 是一个数据集,数据可能不能马上全部获取到,他们在缓冲区,不需要在内存中。适合处理大数据集或者来自外部的数据源的数据块 Node中很多内建模块实现了式接口: 上面的列表中的原生Node.js对象就是可读可写的对象。有些对象是可读也是可写,如TCP sockets,zlib 和 crypto streams 这些对象是密... 使用http-server开启本地服务 前端开发中,经常会遇到要在浏览器中运行一个HTML页面,从本地文件夹中直接打开的一般都是file协议,如果代码中存在http或https的链接时,HTML页面就无法正常打开,此时,需要开启一个本地服务器来运行。 安装node.js 由于Node.js平台是在后端运行JavaScript代码,所以,必须首先在本机安装Node环境。 从官网地址,下载官网推荐版本,并安装。 安装完成后,在Windows环境下,打开命令提示符,然后输入: node -v 如果安装成功, 管道(Pipeline Stream)是一种特殊的操作,它通过将多个连接在一起,实现数据的连续处理和传输。在 Node.js 中,可以通过将多个连接在一起,形成一个管道,以便将数据从一个中传递到另一个中。Node.js 中的Stream)是一种处理数据的方式,它允许你以的方式处理数据,而不是一次性加载整个数据集。在 Node.js 中,可以通过将多个连接在一起,形成一个链式,以便将数据从一个传递到另一个。最后,我们将可读、转换可写连接起来,形成一个管道