03月28, 2018

node深入笔记(3)

第三篇开始。。

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

流可以分为可读流与可写流。

可读流createReadStream

基础用法

const rs = fs.createReadStream(path,[options]);

简单demo:

let rs = fs.createReadStream(path.join(__dirname,'1.txt'),{
    flags:'r', // 文件的操作是读取操作
    encoding:'utf8', // 默认是null null代表的是buffer
    autoClose:true, // 读取完毕后自动关闭
    highWaterMark:3,// 默认是64k  64*1024b
    start:0, // 123 456 789  
    //end:3 // 包前又包后
});

源码中的fs.createReadStream是这样实现的:

alt

alt

highWaterMark是表示一次读入多少字节。

fs.createReadStream有以下监听事件:

alt

然后ReadStream类又继承了Readble。

alt

所以它又拥有下面这些事件:

alt

可读流有两种模式:

  • flowing(流动)
  • paused(暂停)

flowing

在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。举个例子:

rs.on('data',function(data){ 
    console.log(data);
});

我们在data事件中,可以通过rs.pause()来暂停读取。暂停了之后,通过rs.resume()可以恢复读取。

let fs = require('fs');
let path = require('path');
// 返回的是一个可读流对象
let rs = fs.createReadStream(path.join(__dirname,'1.txt'),{
    flags:'r', // 文件的操作是读取操作
    encoding:'utf8', // 默认是null null代表的是buffer
    autoClose:true, // 读取完毕后自动关闭
    highWaterMark:3,// 默认是64k  64*1024b
    start:0, // 123 456 789  
    //end:3 // 包前又包后
});
// rs.setEncoding('utf8');
rs.on('open',function(){
    console.log('文件打开了')
});
rs.on('close',function(){
    console.log('关闭')
});
rs.on('error',function(err){
    console.log(err);
});

// newLisenter
rs.on('data',function(data){ // 暂停模式 -> 流动模式
    console.log(data);
    rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发
    setTimeout(function() {
        rs.resume();
    }, 1000);
});

rs.on('end',function(){
    console.log('end')
});

上面的代码就是实现了一个简单的读取停一秒再读取再停一秒,直到读完为止。

paused

在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。举个例子:

let fs = require('fs');
let path = require('path');
let rs = fs.createReadStream(path.join(__dirname,'1.txt'),{
    highWaterMark:3
});
// 当我只要创建一个流 就会先把缓存区 填满,等待着你自己消费
// 如果当前缓存区被清空后会再次触发readable事件
// 当你消费小于 最高水位线时 会自动添加highWater这么多数据
rs.on('readable',function(){
    let result = rs.read(1);
    console.log(rs._readableState.length); // 缓存区的个数
    setTimeout(function(){
        console.log(rs._readableState.length);
    },1000)
});

alt

通过paused这种,我们可以实现一个行阅读器:

let fs = require('fs');
let EventEmitter = require('events');
let util = require('util');
util.inherits(LineReader, EventEmitter)
fs.readFile('./1.txt',function (err,data) {
    console.log(data);
})
function LineReader(path) {
    EventEmitter.call(this);
    this._rs = fs.createReadStream(path);
    this.RETURN = 0x0D;// \r 13
    this.NEW_LINE = 0x0A;// \n 10
    this.on('newListener', function (type, listener) {
        if (type == 'newLine') {
            let buffer = [];
            this._rs.on('readable', () => {
                let bytes;
                while (null != (bytes = this._rs.read(1))) {
                    let ch = bytes[0];
                    switch (ch) {
                        case this.RETURN:
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            let nByte = this._rs.read(1);
                            if (nByte && nByte[0] != this.NEW_LINE) {
                                buffer.push(nByte[0]);
                            }
                            break;
                        case this.NEW_LINE:  // 如果是mac
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            break;
                        default:
                            buffer.push(bytes[0]);
                            break;
                    }
                }
            });
            this._rs.on('end', () => {
                if (buffer.length > 0) {
                    this.emit('newLine', Buffer.from(buffer));
                    buffer.length = 0;
                    this.emit('end');
                }
            })
        }
    });
}

var lineReader = new LineReader('./1.txt');
lineReader.on('newLine', function (data) {
    console.log(data.toString());
}).on('end', function () {
    console.log("end");
})

深入rs.read方法

alt

alt

我们可以看到ReadStream确实有实现了这样一个方法:

alt

所以rs.read方法大致是这样的:

alt

可写流WriteStream

相比可读流ReadStream,我个人觉得要简单一些。

它就只有write和end方法,以及drain事件。

基础用法

let fs = require('fs');

let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默认写是16k
    encoding:'utf8',
    start:0
});
// 写入的数据必须是字符串或者buffer
ws.write(1+'','utf8',()=>{}); // 异步的方法

ws.write返回值

let fs = require('fs');

let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默认写是16k
    encoding:'utf8',
    start:0
});
// 写入的数据必须是字符串或者buffer
let flag = ws.write(1+'','utf8',()=>{}); // 异步的方法
console.log(flag);
flag = ws.write(1+'','utf8',()=>{}); // 异步的方法
console.log(flag);
flag = ws.write(1+'','utf8',()=>{}); // 异步的方法
console.log(flag);

上面的结果输出为:true true false。

flag并不是是否写入,而是能否继续写,但是返回false 也不会丢失,就是会把内容放到内存中。所以在当第3次的时候,就是3个字符已经写满了,因此返回false。

drain事件

写法如下:

ws.on('drain',function(){
    console.log('drain')
});

它的触发条件是:当都写入完后会触发drain事件。必须缓存区满了,满了后被清空了才会出发drain。

Pipe

有时候我们想实现一个边读边写的功能,类似管道那样。

于是就有了下面的一段代码:

let fs = require('fs');
let path = require('path');
let rs = fs.createReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});
let ws = fs.createWriteStream(path.join(__dirname,'./2.txt'),{
    highWaterMark:1
});

rs.on('data',function(chunk){ // chunk 读到的内容
    ws.write(chunk);
});

这样看似可以,其实是有问题的,因为读是读4个,写只能写1个,所以不断地会产生内存,越来越多。

因此需要写入1个后,暂停读入,直到下一个抽干了,内存中没有了,再继续读入。

let fs = require('fs');
let path = require('path');
let rs = fs.createReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});
let ws = fs.createWriteStream(path.join(__dirname,'./2.txt'),{
    highWaterMark:1
});

rs.on('data',function(chunk){ // chunk 读到的内容
    let flag = ws.write(chunk);
    if(!flag){
        rs.pause();
    }
});

ws.on('drain',function(){
    console.log('抽干了')
    rs.resume();
});

当然,官方api提供了更简单的方式:pipe,用法就是:

let fs = require('fs');
let path = require('path');
let rs = fs.createReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});
let ws = fs.createWriteStream(path.join(__dirname,'./2.txt'),{
    highWaterMa
rs.pipe(ws);

其实ReadStream源码中的实现就是这样的思路:

alt

流的代码实现思路

这里简单说一下,就是通过fs.open、fs.read、fs.write,然后加上EventEmitter实现的。

复杂的实现过程,其实在上一篇中有提及,createReadStream及createWriteStream无非是帮我们简化了这些打开、关闭、读的过程。

我们通过它的一些基础API,也可以实现一个简单版的ReadStream类及WriteStream类

自定义流

可读流

const { Readable } = require('stream');

// 想实现什么流 就继承这个流
// Readable里面有一个read()方法,默认掉_read()
// Readable中提供了一个push方法你调用push方法就会触发data事件
let index = 9;
class MyRead extends Readable{
    _read(){
        // 可读流什么时候停止呢? 当push null的时候停止
        if(index-->0)return this.push('123');
        this.push(null);
    }
}
let mr = new MyRead;
mr.on('data',function(data){
    console.log(data);
});

可写流

const { Writable } = require('stream');
// 可写流实现_write方法
class MyWrite extends Writable{
    _write(chunk,encoding,callback){
        console.log(chunk.toString());
        callback(); // clearBuffer
    }
}
let mw = new MyWrite();
mw.write('小翼','utf8',()=>{
    console.log(1);
})
mw.write('小翼','utf8',()=>{
    console.log(1);
});

可读写流(双工流)

const { Duplex } =  require('stream');
// 双工流 又能读 又能写,而且读取可以没关系(互不干扰);
class D extends Duplex {
    _read(){
        this.push('hello');
        this.push(null)
    }
    _write(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
};
let d = new D();
d.on('data',function(data){
    console.log(data.toString());
});
d.write('hello');

转换流

类似Duplex双工流。

const { Transform } =  require('stream');

let tranform1 = Transform({
    transform(chunk,encoding,callback){
        this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
});
// 等待你的输入
// rs.pipe(ws);
// 希望将输入的内容转化成大写在输出出来
process.stdin.pipe(tranform1).pipe(tranform2).pipe(process.stdout)

这个写法其实是有点像gulp的管道的。gulp的内部应该也是基于类似的实现。

其他流

还有一个对象流,但因为场景比较少,所以就不贴代码了。

结语

学了这些多东西,有些东西确实感觉还是懵懵懂懂的。不过真的感觉流这一块挺重要的。比如像tcp、http都会用到流,因此它们的常用方法还是得掌握一下。

本文链接:www.my-fe.pub/post/node-deep-note-3.html

-- EOF --

Comments

评论加载中...

注:如果长时间无法加载,请针对 disq.us | disquscdn.com | disqus.com 启用代理。