在Unix中流是一个标准的概念,有标准的输入、输出和标准的错误

例如:

打印出所有的js文件交给grep 来过滤出包含http文件的内容,称之为Unix的管道

cat *.js | grep http

从上节得知Buffer是保存字节的数据,而流是用来暂存和移动数据的,它俩通常是结合起来来使用,我们来拷贝文件,像读取logo,是全部的读取入到内存中,然后再写入到文件中,对于体积比较大的的文件就不够用了假设我们的服务器需要不断的去读取文件,然后返回给客户端,同时又有好多人都在请求这个文件,这样每个请求都去读入一次内存,然后内存很快就爆掉了,最好的方式是边读边写

这就需要借助流来完成,那NodeJs中哪些模块涉及到了流;比如http、文件系统、压缩模块、tcp socket并且流是以buffer的形式存在,这样更高效。

改造logo图片读取操作

varfs=require('fs')varsource=fs.readFileSync('logo.png')fs.writeFileSync('stream_copy_logo.png',source)


但是这样的操作会不会太简单了,而没有办法精细的控制数据在流里面的传输,

以上这些都不用担心,Stream是基于事件机制进行工作的,

流在各个方面的变化都可以被我们监听到

varfs=require('fs')//声明一个可读流varreadStream=fs.createReadStream('logo_stream.js')//Stream在传输的时候会触发data事件readStream.on('data',function(chunk){console.log('dataemits')console.log(Buffer.isBuffer(chunk))console.log(chunk.toString('utf8'))})//还有readable事件,可读的.on('readable',function(){console.log('datareadable')}).on('end',function(){console.log('dataends')}).on('close',function(){console.log('dataclose')}).on('error',function(e){console.log('datareaderror:'+e)})

运行结果如下:


借助于Stream的事件机制,我们就能实现更多个性化的定制,

从而对流里面的流程进行更精细化的控制,改造上述代码:

varfs=require('fs')//声明一个可读流varreadStream=fs.createReadStream('logo_stream.js')varn=0//Stream在传输的时候会触发data事件readStream.on('data',function(chunk){n++console.log('dataemits')console.log(Buffer.isBuffer(chunk))//console.log(chunk.toString('utf8'))//流暂停readStream.pause()//设置定时器,模拟异步处理console.log('datapause')setTimeout(function(){console.log('datapauseend')//再重新启动readStream.resume()},3000)})//还有readable事件,可读的.on('readable',function(){console.log('datareadable')}).on('end',function(){console.log(n)console.log('dataends')}).on('close',function(){console.log('dataclose')}).on('error',function(e){console.log('datareaderror:'+e)})

运行效果如下:

换一个大一点的文件,3M左右的;打印结果如下:

大概每次是64kb

用事件的方式来重构复制图片的操作

varfs=require('fs')//放入一个大文件varreadStream=fs.createReadStream('1.pdf')varwriteStream=fs.createWriteStream('1_stream.pdf');//必然触发一个事件readStream.on('data',function(chunk){//写入目标if(writeStream.write(chunk)===false){//判断是否已经写入到目标,来解决爆仓console.log('stillcached')readStream.pause()}})readStream.on('end',function(){writeStream.end()})//耗尽方法writeStream.on('drain',function(){console.log('datadrains')readStream.resume()})/*这是个标准的文件的拷贝操作,但是会有问题;如果读的快,写的慢;因为读写的速度并不是恒定的,这个时候数据流内部的缓存可能会被爆仓,那应该怎么办*/

运行结果如下:

边读边写效果.



Stream的种类

Readable:可读流,用来提供数据;外部来源的数据会被存储到buffer里缓存起来,两种模式:流动模式,暂停模式

Writable:可写流,消费数据;

Duplex:双通流,可读可写

Transform:转换流,双通

各自事件,属性都大同小异.场景如下:请求一张图片的数据,在浏览器中显示出来

varhttp=require('http')varfs=require('fs')http.createServer(function(req,res){/*fs.readFile('logo.png',function(err,data){if(err){res.end('filenotexist')}else{res.wirteHeader(200,{'Context-Type':'text/html'})res.end(data)}})*///利用pipe就能够更简约的实现这套逻辑fs.createReadStream('logo.png').pipe(res)}).listen(8090)

运行效果如下:

不止是本地图片的读取,也可以是网络环境下的

使用NodeJs中的request模块

//使用之前先安装,npminstallrequestvarrequest=require('request')request('url').pipe(res)//这样就可以实现边下载边显示

运行结果同上。


在这里pipe方法会自动帮我们监听data和end事件,还可以自动控制后端压力,通过对内存空间的调度就能自动控制流量、避免掉目标被快速读取,只有末端真正需要数据的时候,数据才会从源头被取出来然后顺着管道一路走下去

再次重构读取pdf文件

//只需要2行代码varfs=require('fs')fs.createReadStream('1.pdf').pipe(fs.createWriteStream('1_pipe.pdf'))


pipe做通道连接时的例子:

varReadable=require('stream').ReadablevarWritable=require('stream').Writable//拿到两个实例varreadStream=newReadable()varwritStream=newWritable()//push一些数据readStream.push('I')readStream.push('Love')readStream.push('NodeJs')//读取完毕readStream.push(null)//重写方法writStream._write=function(chunk,encode,cb){console.log(chunk.toString())cb()}//最后,使用pipe连接起来readStream.pipe(writStream)

运行结果如下:


来实现一个定制的可读流,可写流、转换流

varstream=require('stream')varutil=require('util')//定制的可写流functionReadStream(){//首先改变它的上下文,让它可以调用Stream里面可读类的方法stream.Readable.call(this)}//来让我们声明的可读流继承流里面可读的原型util.inherits(ReadStream,stream.Readable)//然后就可以为可读流添加原型链上的read方法ReadStream.prototype._read=function(){//只干一件事,push数据this.push('I')this.push('Love')this.push('NodeJs')this.push(null)}//声明可写流functionWriteStream(){stream.Writable.call(this)//声明cachethis._cached=newBuffer('')}util.inherits(WriteStream,stream.Writable)WriteStream.prototype._write=function(chunk,encode,cb){console.log(chunk.toString())cb()}//声明转换流functionTransformStream(){stream.Transform.call(this)}util.inherits(TransformStream,stream.Transform)TransformStream.prototype._transform=function(chunk,encode,cb){this.push(chunk)cb()}//flushTransformStream.prototype._flush=function(cb){this.push('OhYear!')cb()}//生成实例varrs=newReadStream()varws=newWriteStream()varts=newTransformStream()//读到的数据pipe给转换流rs.pipe(ts).pipe(ws)

运行结果如下: