www.hj8828.com 2

NodeJs中的stream(流)- 基础篇

什么是流?

Node.js中的流(Stream)介绍

 这篇文章主要介绍了Node.js中的流(Stream)介绍,本文讲解了什么是流、pipe方法、流的分类、Readable流状态的切换等内容,需要的朋友可以参考下

 

 

什么是流?

说到流,就涉及到一个*nix的概念:管道——在*nix中,流在Shell中被实现为可以通过
|(管道符)
进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin)。

在Node中,流(Stream)的概念与之类似,代表一种数据流可供桥接的能力。

pipe

流化的精髓在于 .pipe()方法。可供桥接的能力,在于数据流的两端(上游/下游
或称为 读/写流)以一个 .pipe()方法进行桥接。

www.hj8828.com 1

伪代码的表现形式为:

代码如下:

//上游.pipe(下游)
Readable.pipe(Writable);

 

流的分类

这里并不打算讨论所谓的Node v0.4
之前的“经典”流。那么,流分为这么几类(皆为抽象接口:

1.stream.Readable
可读流(需要实现_read方法,关注点在于对数据流读取的细节
2.stream.Writable
可写流(需要实现_write方法,关注点在于对数据流写入的细节
3.stream.Duplex
可读/写流(需要实现以上两接口,关注点为以上两接口的细节
4.stream.Transform
继承自Duplex(需要实现_transform方法,关注点在于对数据块的处理

简单来说:

1).pipe() 的拥有者一定具备 Readable 流(并不局限于)能力,它拥有
‘readable’/’data’/’end’/’close’/’error’ 一系列事件可供订阅,也提供
.read()/.pause()/.resume()等一系列方法供调用;
2).pipe() 的参数一定具备Writable 流(并不局限于 )能力,它拥有
‘drain’/’pipe’/’unpipe’/’error’/’finish’ 事件可供访问,也提供
.write()/.end() 等一系列方法供调用

什么鬼

有没有一丝丝焦虑?别急,做为一个说人话的低级码工,我会把Stream掰开了和您扯一扯的。

Stream类,在 Node.js的源码 里,是这么定义的:

代码如下:

var EE = require(‘events’).EventEmitter;
var util = require(‘util’);
util.inherits(Stream, EE);

function Stream() {
EE.call(this);
}

 

可以看出,本质上,Stream是一个EventEmitter,那意味着它具备事件驱动的功能(.emit/.on…)。众所周知,“Node.js
就是基于V8的事件驱动平台”,实现了事件驱动的流式编程,具备了和Node一样的异步回调的特征。

比如在 Readable 流中,有一个 readable
事件,在一个暂停的只读流中,只要有数据块准备好可读时,它就会被发送给订阅者(Readable
流有哪些呢?express中的
req,ftp或者mutli-form上传组件的req.part,系统中的标准输入
process.stdin等)。有了readable 事件,我们可以做个处理shell
命令输出的分析器之类的工具:

代码如下:

process.stdin.on(‘readable’, function(){
var buf = process.stdin.read();
if(buf){
var data = buf.toString();
// parsing data …
}
});

 

这样调用:

 

代码如下:

head -10 some.txt | node parser.js

 

对于 Readable 流,我们还可以订阅它的 data 和 end
事件,以获取数据块并在流枯竭时获得通知,如 经典socket示例 中那样:

 代码如下:

req.on(‘connect’, function(res, socket, head) {
socket.on(‘data’, function(chunk) {
console.log(chunk.toString());
});
socket.on(‘end’, function() {
proxy.close();
});
});

 

www.hj8828.com,Readable流状态的切换
需要注意的是,Readable 流有两种状态:flowing mode(激流) 和 pause
mode(暂停)。前者根本停不下来,谁被pipe上了就马上不停的给;后者会暂停,直到下游显式的调用
Stream.read() 请求才读取数据块。Readable 流初始化时是 pause mode的。

这两种状态可以互为切换的,其中,

有以下任一行为,pause 转 flowing:

1.对 Readable 流添加一个data事件订阅
2.对 Readable 调用 .resume() 显式开启flowing
3.调用 Readable 流的 .pipe(writable) ,桥接到一个 Writable 流上

有以下任一行为,flowing 转回 pause:

1.Readable 流还没有 pipe 到任何流上,可调 .pause() 暂停
2.Readable 流已经 pipe 到了流上,需 remove 掉所有 data
事件订阅,并且调用 .unpipe()方法逐一解除与下游流的关系

妙用

结合流的异步特性,我可以写出这样的应用:直接将 用户A 的输出桥接到 用户B
的页面上输出:

代码如下:

router.post(‘/post’, function(req, res) {
var destination = req.headers[‘destination’]; //发给谁
cache[destionation] = req;
//是的,并不返回,所以最好是个ajax请求
});

 

用户B请求的时候:

 

代码如下:

router.get(‘/inbox’, function(req, res){
var user = req.headers[‘user’];
cache.find(user, function(err, previousReq){ //找到之前存的req
var form = new multiparty.Form();
form.parse(previousReq); // 有文件给我
form.on(‘part’, function (part) {
part.pipe(res); //流式大法好:)

part.on(‘error’, function (err) {
console.log(err);
messaging.setRequestDone(uniqueID);
return res.end(err);
});
});
});
});

 

参考

how to write node programs with streams: stream-handbook

这篇文章主要介绍了Node.js中的流(Stream)介绍,本文讲解了什么是流、pipe方法、流的分类、Readable流状态的切换等…

一、什么是Stream(流)

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract
interface)。 stream 模块提供了基础的 API 。使用这些 API
可以很容易地来构建实现流接口的对象。

流是可读的、可写的,或是可读写的。

说到流,就涉及到一个*nix的概念:管道——在*nix中,流在Shell中被实现为可以通过
| 进行桥接的数据,一个进程的输出可被直接作为下一个进程的输入。

相关文章

相关搜索:

今天看啥

搜索技术库

返回首页

  • CAD迷你看图如何查看设备清单
  • 如何玩转淘宝个性化搜索
  • 浅谈JavaScript中的Math.atan()方法的使用
  • JavaScript中的acos()方法使用详解
  • JavaScript中的Math.SQRT1_2属性使用简介
  • JavaScript中的Math.LOG2E属性使用详解

相关频道:
HTML/CSS  HTML5  Javascript  jQuery  AJax教程  前端代码  正则表达式  Flex教程  WEB前端教程  

二、NodeJs中的Stream的几种类型

Node.js 中有四种基本的流类型:

  • Readable – 可读的流(fs.createReadStream())
  • Writable – 可写的流(fs.createWriteStream())
  • Duplex – 可读写的流(net.Socket)
  • Transform – 在读写过程中可以修改和变换数据的 Duplex 流 (例如
    zlib.createDeflate())

NodeJs中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。

const stream = require('stream');

在 NodeJS 中对文件的处理多数使用流来完成

  • 普通文件
  • 设备文件(stdin、stdout)
  • 网络文件(http、net)

注:在NodeJs中所有的Stream(流)都是EventEmitter的实例

Example:

1.将1.txt的文件内容读取为流数据

const fs = require('fs');

// 创建一个可读流(生产者)
let rs = fs.createReadStream('./1.txt'); 

通过fs模块提供的createReadStream()可以轻松创建一个可读的文件流。但我们并有直接使用Stream模块,因为fs模块内部已经引用了Stream模块并做了封装。所以说
流(stream)在 Node.js
中是处理流数据的抽象接口,提供了基础Api来构建实现流接口的对象。

var rs = fs.createReadStream(path,[options]);

1.path 读取文件的路径

2.options

  • flags打开文件的操作, 默认为’r’
  • mode 权限位 0o666
  • encoding默认为null
  • start开始读取的索引位置
  • end结束读取的索引位置(包括结束位置)
  • highWaterMark读取缓存区默认的大小64kb

Node.js 提供了多种流对象。 例如:

  • HTTP 请求 (request response)
  • process.stdout 就都是流的实例。

2.创建可写流(消费者)处理可读流

将1.txt的可读流 写入到2.txt文件中 这时我们需要一个可写流

const fs = require('fs');
// 创建一个可写流
let ws = fs.createWriteStream('./2.txt');
// 通过pipe让可读流流入到可写流 写入文件
rs.pipe(ws); 

var ws = fs.createWriteStream(path,[options]);

1.path 读取文件的路径

2.options

  • flags打开文件的操作, 默认为’w’
  • mode 权限位 0o666
  • encoding默认为utf8
  • autoClose:true是否自动关闭文件
  • highWaterMark读取缓存区默认的大小16kb

pipe 它是Readable流的方法,相当于一个”管道”,数据必须从上游 pipe
到下游,也就是从一个 readable 流 pipe 到 writable 流。
后续将深入将介绍pipe。

www.hj8828.com 2

stream-1.png

如上图,我们把文件比作装水的桶,而水就是文件里的内容,我们用一根管子(pipe)连接两个桶使得水从一个桶流入另一个桶,这样就慢慢的实现了大文件的传输过程。

在Node中,流的概念与之类似,代表一种数据流可供桥接的能力。

帮客评论

三、为什么应该使用 Stream

当有用户在线看视频,假定我们通过HTTP请求返回给用户视频内容

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
    fs.readFile(videoPath, (err, data) => {
        res.end(data);
    });
}).listen(8080);

但这样有两个明显的问题

1.视频文件需要全部读取完,才能返回给用户,这样等待时间会很长
2.视频文件一次全放入内存中,内存吃不消

用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了
HTTP 协议的 Transfer-Encoding: chunked
分段传输特性),用户体验得到优化,同时对内存的开销明显下降

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
    fs.createReadStream(videoPath).pipe(res);
}).listen(8080);

pipe

四、可读流(Readable Stream)

可读流(Readable streams)是对提供数据的源头(source)的抽象。

例如:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets
  • process.stdin

所有的 Readable 都实现了 stream.Readable 类定义的接口。

流化的精髓在于 .pipe()方法。可供桥接的能力,在于数据流的两端以一个
.pipe()方法进行桥接。

可读流的两种模式(flowing 和 paused)

1.在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter
接口的事件尽快将数据提供给应用。

2.在 paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。

所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:

  • 监听’data’事件
  • 调用stream.resume()方法
  • 调用stream.pipe()方法将数据发送到Writable

伪代码的表现形式为:复制代码
代码如下://上游.pipeReadable.pipe;

流动模式flowing

流切换到流动模式 监听data事件

const rs = fs.createReadStream('./1.txt');
const ws = fs.createWriteStream('./2.txt');
rs.on('data', chunk => {
    ws.write(chunk);
});
ws.on('end', () => {
    ws.end();
});

如果写入的速度跟不上读取的速度,有可能导致数据丢失。正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续。

var fs = require('fs');
// 读取highWaterMark(3字节)数据,读完之后填充缓存区,然后触发data事件
var rs = fs.createReadStream(sourcePath, {
    highWaterMark: 3 
});
var ws = fs.createWriteStream(destPath, {
    highWaterMark: 3
});

rs.on('data', function(chunk) { // 当有数据流出时,写入数据
    if (ws.write(chunk) === false) { // 如果没有写完,暂停读取流
        rs.pause();
    }
});

ws.on('drain', function() { // 缓冲区清空触发drain事件 这时再继续读取
    rs.resume();
});

rs.on('end', function() { // 当没有数据时,关闭数据流
    ws.end();
});

或者使用更直接的pipe

fs.createReadStream(sourcePath).pipe(fs.createWriteStream(destPath));

流的分类

暂停模式paused

1.在流没有 pipe() 时,调用 pause() 方法可以将流暂停
2.pipe() 时,需要移除所有 data 事件的监听,再调用 unpipe() 方法

这里并不打算讨论所谓的Node v0.4
之前的“经典”流。那么,流分为这么几类(皆为抽象接口:

read(size)

流在暂停模式下需要程序显式调用 read() 方法才能得到数据。read()
方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null。read()不会触发’data’事件。

使用 read() 方法读取数据时,如果传入了 size
参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据。
NodeJS 为我们提供了一个 readable
的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知又数据了我们再去读取就好了:

const fs = require('fs');
rs = fs.createReadStream(sourcePath);

// 当你监听 readable事件的时候,会进入暂停模式
rs.on('readable', () => {
    console.log(rs._readableState.length);
        // read如果不加参数表示读取整个缓存区数据
        // 读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
        let ch = rs.read(1);
});

暂停模式 缓存区的数据以链表的形式保存在BufferList中

1.stream.Readable
可读流(需要实现_read方法,关注点在于对数据流读取的细节2.stream.Writable
可写流(需要实现_write方法,关注点在于对数据流写入的细节3.stream.Duplex
可读/写流(需要实现以上两接口,关注点为以上两接口的细节4.stream.Transform
继承自Duplex(需要实现_transform方法,关注点在于对数据块的处理

五、可写流(Writable Stream)

可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者
TCP、HTTP 等网络响应。

Writable 的例子包括了:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

所有 Writable 流都实现了 stream.Writable 类定义的接口。

process.stdin.pipe(process.stdout);

process.stdout 是一个可写流,程序把可读流 process.stdin
传过来的数据写入的标准输出设备。在了解了可读流的基础上理解可写流非常简单,流就是有方向的数据,其中可读流是数据源,可写流是目的地,中间的管道环节是双向流。

1).pipe() 的拥有者一定具备 Readable 流能力,它拥有
‘readable’/’data’/’end’/’close’/’error’ 一系列事件可供订阅,也提供
.read等一系列方法供调用;2).pipe() 的参数一定具备Writable
流能力,它拥有 ‘drain’/’pipe’/’unpipe’/’error’/’finish’
事件可供访问,也提供 .write 等一系列方法供调用

可写流使用

调用可写流实例的 write() 方法就可以把数据写入可写流

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);

rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
  ws.write(chunk); // 写入数据
});

监听了可读流的 data
事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的 write()
方法,这样数据就被写入了可写流抽象的设备destPath中。

write() 方法有三个参数

  • chunk {String| Buffer},表示要写入的数据
  • encoding 当写入的数据是字符串的时候可以设置编码
  • callback 数据被写入之后的回调函数