广告
返回顶部
首页 > 资讯 > 前端开发 > node.js >浅谈Node.js:理解stream
  • 692
分享到

浅谈Node.js:理解stream

浅谈Nodestream 2022-06-04 17:06:20 692人浏览 八月长安
摘要

Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种api让我们可以很简单的使用Stream。 流分为四种类型,

Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种api让我们可以很简单的使用Stream。

流分为四种类型,如下所示:

Readable,可读流 Writable,可写流 Duplex,读写流 TransfORM,扩展的Duplex,可修改写入的数据

1、Readable可读流

通过stream.Readable可创建一个可读流,它有两种模式:暂停和流动。

在流动模式下,将自动从下游系统读取数据并使用data事件输出;暂停模式下,必须显示调用stream.read()方法读取数据,并触发data事件。

所有的可读流最开始都是暂停模式,可以通过以下方法切换到流动模式:

监听'data'事件 调用stream.resume()方法 调用stream.pipe()方法将数据输出到一个可写流Writable

同样地,也可以切换到暂停模式,有两种方法:

如果没有设置pipe目标,调用stream.pause()方法即可。 如果设置了pipe目标,则需要移除所有的data监听和调用stream.unpipe()方法

在Readable对象中有一个_readableSate的对象,通过该对象可以得知流当前处于什么模式,如下所示:

readable._readableState.flowing = null,没有数据消费者,流不产生数据 readable._readableState.flowing = true,处于流动模式 readable._readableState.flowing = false,处于暂停模式

为什么使用流取数据

对于小文件,使用fs.readFile()方法读取数据更方便,但需要读取大文件的时候,比如几G大小的文件,使用该方法将消耗大量的内存,甚至使程序崩溃。这种情况下,使用流来处理是更合适的,采用分段读取,便不会造成内存的'爆仓'问题。

data事件

在stream提供数据块给消费者时触发,有可能是切换到流动模式的时候,也有可能是调用readable.read()方法且有有效数据块的时候,使用如下所示:


const fs = require('fs');

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});

readable事件

当流中有可用数据能被读取时触发,分为两种,新的可用的数据和到达流的末尾,前者stream.read()方法返回可用数据,后者返回null,如下所示:


const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;

rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});

pause和resume方法

stream.pause()方法让流进入暂停模式,并停止'data'事件触发,stream.resume()方法使流进入流动模式,并恢复'data'事件触发,也可以用来消费所有数据,如下所示:


const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});

pipe(destination[, options])方法

pipe()方法绑定一个可写流到可读流上,并自动切换到流动模式,将所有数据输出到可写流,以及做好了数据流的管理,不会发生数据丢失的问题,使用如下所示:


const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);

以上介绍了多种可读流的数据消费的方法,但对于一个可读流,最好只选择其中的一种,推荐使用pipe()方法。

2、Writable可写流

所有的可写流都是基于stream.Writable类创建的,创建之后便可将数据写入该流中。

write(chunk[, encoding][, callback])方法

write()方法向可写流中写入数据,参数含义:

chunk,字符串或buffer encoding,若chunk为字符串,则是chunk的编码 callback,当前chunk数据写入磁盘时的回调函数

该方法的返回值为布尔值,如果为false,则表示需要写入的数据块被缓存并且此时缓存的大小超出highWaterMark阀值,否则为true。

使用如下所示:


const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')

背压机制

如果可写流的写入速度跟不上可读流的读取速度,write方法添加的数据将被缓存,逐渐增多,导致占用大量内存。我们希望的是消耗一个数据,再去读取一个数据,这样内存就维持在一个水平上。如何做到这一点?可以利用write方法的返回值来判断可写流的缓存状态和'drain'事件,及时切换可读流的模式,如下所示:


function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
    
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');

drain事件

如果Writable.write()方法返回false,则drain事件将会被触发,上面的背压机制已经使用了该事件。

finish事件

在调用stream.end()方法之后且所有缓存区的数据都被写入到下游系统,就会触发该事件,如下所示:


const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用

end([chunk][, encoding][, callback])方法

end()方法被调用之后,便不能再调用stream.write()方法写入数据,负责将抛出错误。

3、Duplex读写流

Duplex流同时实现了Readable与Writable类的接口,既是可读流,也是可写流。例如'zlib streams'、'crypto streams'、'tcp Sockets'等都是Duplex流。

4、Transform流

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:


const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();






const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):



const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:


class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}

const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('nan');
myDuplex.write('cn');
myDuplex.end('bn');
myDuplex.pipe(process.stdout);

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:


class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:


_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:


const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on('data',(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>

const rs1 = Readable({objectMode:!0});
rs1.push('a');
rs1.push('b');
rs1.push(null);
rs1.on('data',(chunk)=>{console.log(chunk);});//a与b

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:


function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[srnt]/g,''));
    }
  });
  fs.createReadStream(src,{encoding:'utf8'}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify('./reset.css','./reset.min.css');

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: 浅谈Node.js:理解stream

本文链接: https://www.lsjlt.com/news/12876.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 浅谈Node.js:理解stream
    Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种API让我们可以很简单的使用Stream。 流分为四种类型,...
    99+
    2022-06-04
    浅谈 Node stream
  • 浅析Node.js 中 Stream API 的使用
    本文由浅入深给大家介绍node.js stream api,具体详情请看下文吧。 基本介绍 在 Node.js 中,读取文件的方式有两种,一种是用 fs.readFile ,另外一种是利用 fs.creat...
    99+
    2022-06-04
    js Node API
  • 浅谈Node.js中的定时器
    Node.js中定时器的实现 上一篇博文提到,在Node中timer并不是通过新开线程来实现的,而是直接在event loop中完成。下面通过几个JavaScript的定时器示例以及Node相关源码来分析在...
    99+
    2022-06-04
    定时器 浅谈 Node
  • 浅谈TypeScript 索引签名的理解
    目录1.什么是索引签名2. 索引签名语法3. 索引签名的注意事项3.1不存在的属性3.2 string 和 number 键4.索引签名与 Record<Keys, Type&...
    99+
    2022-11-12
    TypeScript 索引签名 TypeScript 索引签名
  • 浅谈Java注解和动态代理
    本文主要介绍Java中与注解和动态代理有关的部分知识,接下来我们看看具体内容。Annotation(注解)其实就是代码里的特殊标记, 它用于替代配置文件,也就是说,传统方式通过配置文件告诉类如何运行,有了注解技术后,开发人员可以通过注解告诉...
    99+
    2023-05-31
    java 注解 动态代理
  • 浅谈node.js中async异步编程
    1.什么是异步编程? 异步编程是指由于异步I/O等因素,无法同步获得执行结果时, 在回调函数中进行下一步操作的代码编写风格,常见的如setTimeout函数、ajax请求等等。 示例: for (v...
    99+
    2022-06-04
    浅谈 node async
  • 浅谈Node.js之异步流控制
    前言 在没有深度使用函数回调的经验的时候,去看这些内容还是有一点吃力的。由于Node.js独特的异步特性,才出现了“回调地狱”的问题,这篇文章中,我比较详细的记录了如何解决异步流问题。 文章会很长,而且这篇...
    99+
    2022-06-04
    浅谈 Node js
  • 浅谈Spring中IOC的理解和认知
    IOC的推导 1.1、模拟一个正常查询信息的业务流程: ①mapper层:因为没有连接数据库,这里我们写一个mapper的实现类来模拟数据的查询 public interface...
    99+
    2022-11-12
    Spring IOC
  • 【DB究谈】浅谈对数据库隔离级别的理解
    当人们提及数据库管理系统(DBMS),必会提及事务、ACID特性以及事务隔离级别。事务本身是为了保证系统的运行状态最终将处于一致性(满足一组约束条件)的状态而出现的概念,其中的ACID特性中的I(Isolation)要保证在并发操作情况下数...
    99+
    2021-06-20
    【DB究谈】浅谈对数据库隔离级别的理解
  • 浅谈SpringCloud之Ribbon详解
    目录一、什么是负载均衡二、实现负载均衡的三种方式三、Ribbon简介四、Ribbon的应用五、Ribbon和Feign的区别一、什么是负载均衡 负载均衡:建立在现有网络结构之上,它...
    99+
    2022-11-12
    SpringCloud Ribbon SpringCloud
  • 浅谈SpringSecurity基本原理
    目录一、SpringSecurity 本质二、典型过滤器2.1 FilterSecurityInterceptor2.2 ExceptionTranslationFilter2.3 ...
    99+
    2022-11-12
    SpringSecurity原理 Spring底层原理
  • 浅谈Java 代理机制
    目录一、常规编码方式二、代理模式概述三、静态代理3.1、什么是静态代理3.2、代码示例四、Java 字节码生成框架五、什么是动态代理六、JDK 动态代理机制6.1、使用步骤6.2、代...
    99+
    2022-11-12
    Java 静态代理 Java 动态代理
  • 浅谈MySQL之浅入深出页原理
    目录一、页的概览二、Infimum 和 Supremum三、使用Page Directory四、页的真实面貌4.1、File Header4.2、Page Header4.3、Infimum & Suprem...
    99+
    2022-05-19
    MySQL 页原理
  • 浅析Node.js的Stream模块中的Readable对象
    我一直都很不愿意扯 nodejs 的流,因为从第一次看到它我就觉得它的设计实在是太恶心了。但是没办法,Stream 规范尚未普及,而且确实有很多东西都依赖了 nodejs 的流来实现的,所以我也只能捏着鼻子...
    99+
    2022-06-04
    模块 对象 js
  • Java 浅谈 高并发 处理方案详解
    目录高性能开发十大必须掌握的核心技术I/O优化:零拷贝技术I/O优化:多路复用技术线程池技术无锁编程技术进程间通信技术Scale-out(横向拓展)缓存异步高性能、高可用、高拓展 解...
    99+
    2022-11-12
    Java高并发 Java处理方案
  • 浅谈对于DAO设计模式的理解
    为了降低耦合性,提出了DAO封装数据库操作的设计模式。它可以实现业务逻辑与数据库访问相分离。相对来说,数据库是比较稳定的,其中DAO组件依赖于数据库系统,提供数据库访问的接口。一般的DAO的封装由以下另个原则: · 一个表对应一个...
    99+
    2023-05-31
    dao 设计模式 %d
  • 浅谈node.js中间件有哪些类型
    目录概述1、应用级中间件2、内置中间件3、第三方中间件(1)body-parser,将post请求数据解析为对象(2)mysql模块概述 node中间件就是封装在程序中处理http请...
    99+
    2022-11-12
    node 中间件
  • 浅谈Java解释器模式
    ​ **请注意!请注意!!!**今天讲给大家讲解非常“有用”的设计模式,解释器模式!!! ​ 设计模式有三大种类,一种是创建型模式,一种是结构型模式,最后一种...
    99+
    2022-11-12
    Java模式 Java解释器模式
  • 浅谈mysql join底层原理
    目录join算法驱动表和非驱动表的区别1、Simple Nested-Loop Join,简单嵌套-无索引的情况2、Index Nested-Loop Join-有索引的情况3、Block Nested-Loop J...
    99+
    2022-05-30
    mysql join底层原理 mysql join
  • 浅谈Python的异常处理
    Python的异常处理能力是很强大的,可向用户准确反馈出错信息。在Python中,异常也是对象,可对它进行操作。所有异常都是基类Exception的成员。所有异常都从基类Exception继承,而且都在e...
    99+
    2022-06-04
    浅谈 异常 Python
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作