Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

這篇文章主要介紹“Nodejs中的可讀流的作用和實(shí)現(xiàn)方法”,在日常操作中,相信很多人在Nodejs中的可讀流的作用和實(shí)現(xiàn)方法問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Nodejs中的可讀流的作用和實(shí)現(xiàn)方法”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比貢覺(jué)網(wǎng)站開(kāi)發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式貢覺(jué)網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋貢覺(jué)地區(qū)。費(fèi)用合理售后完善,十多年實(shí)體公司更值得信賴。

stream的概念

流(stream)是 Node.js 中處理流式數(shù)據(jù)的抽象接口。 stream 模塊用于構(gòu)建實(shí)現(xiàn)了流接口的對(duì)象?!就扑]學(xué)習(xí):《nodejs 教程》】

stream的作用

讀寫(xiě)大文件的過(guò)程中,不會(huì)一次性的讀寫(xiě)到內(nèi)存中??梢钥刂泼看巫x寫(xiě)的個(gè)數(shù)

stream的分類

1、可讀流-Readable

例:fs.createReadStream;

源碼位置:lib/_stream_readable.js

2、可寫(xiě)流-Writable

例:fs.createWriteStream;

源碼位置:lib/_stream_writable.js

3、雙工流-Duplex:滿足讀寫(xiě)的功能

例:net.Socket();

源碼位置:lib/_stream_duplex.js

4、轉(zhuǎn)化流-Transform:用途:壓縮,轉(zhuǎn)碼

例:

const { Transform } = require('stream');
Transform.call(this, '要轉(zhuǎn)換的數(shù)據(jù)');//具體的使用詳情 見(jiàn)node官網(wǎng)

-源碼位置:lib/_stream_tranform.js

可讀流讀取文件的過(guò)程

  • 讀取文件代碼過(guò)程

const path = require("path");
const aPath = path.join(__dirname, "a.txt");//需要讀取的文件
const fs = require("fs");
let rs = fs.createReadStream(aPath, {
  flags: "r",
  encoding: null,//默認(rèn)編碼格式是buffer,深挖buffer又要學(xué)習(xí)字符編碼,留個(gè)坑 到時(shí)候?qū)懸粋€(gè)編碼規(guī)范的學(xué)習(xí)整理
  autoClose: true,//相當(dāng)于需要調(diào)用close方法,如果為false  文件讀取end的時(shí)候 就不會(huì)執(zhí)行 close
  start: 0,
  highWaterMark: 3,//每次讀取的個(gè)數(shù) 默認(rèn)是64*1024個(gè)字節(jié)
});

rs.on("open", function (fd) {
  // fd  number類型
  console.log("fd", fd);
});
// 他會(huì)監(jiān)聽(tīng)用戶,綁定了data事件,就會(huì)觸發(fā)對(duì)應(yīng)的回調(diào),不停的觸發(fā)
rs.on("data", function (chunk) {
//這里會(huì)打印的是ascII 值 ,所以可以toString查看詳情自己看得懂的樣子
  console.log({ chunk }, "chunk.toString", chunk.toString()); 
  //如果想每一段事件 讀一點(diǎn) 可以用rs.pause() 做暫停,然后計(jì)時(shí)器 里rs.resume()再次觸發(fā)data事件
  rs.pause();//暫停讀取
});
rs.on("close", function () {
  //當(dāng)文件讀取完畢后 會(huì) 觸發(fā) end事件
  console.log("close");
});
setInterval(() => {
  rs.resume(); //再次觸發(fā)data,直到讀完數(shù)據(jù)為止
}, 1000);
  • 題外話:想說(shuō)下 文件流和普通可讀流的區(qū)別

1、open 和close是文件流獨(dú)有,支持open和close便是文件流

2、可讀流都具備 (on('data'),on('end'),on('error'),resume,pause;所以只要支持這些方法就是可讀流

可寫(xiě)流寫(xiě)入文件的過(guò)程

  • 寫(xiě)入文件代碼過(guò)程

const fs = require("fs");
const path = require("path");
const bPath = path.join(__dirname, "b.txt");
let ws = fs.createWriteStream(bPath, {
//參數(shù)和可讀流的類似
  flags: "w",
  encoding: "utf-8",
  autoClose: true,
  start: 0,
  highWaterMark: 3,
});
ws.on("open", function (fd) {
  console.log("open", fd);
});
ws.on("close", function () {
  console.log("close");
});

//write的參數(shù)string 或者buffer,ws.write 還有一個(gè)boolea的返回值表示是真實(shí)寫(xiě)入文件還是放入緩存中
ws.write("1");
let flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//false

雙工流的寫(xiě)入和讀取過(guò)程

  • 寫(xiě)一個(gè)本地服務(wù) 做例子

1、server(服務(wù)器代碼)實(shí)現(xiàn)

const net = require("net"); //net 模塊是 node自己封裝的tcp層
//socket 就是雙工流 能讀能寫(xiě)  http源碼就是用net模塊寫(xiě)的 基于tcp
const server = net.createServer(function (socket) {
  socket.on("data", function (data) {//監(jiān)聽(tīng)客戶端發(fā)來(lái)的消息
    console.log(data.toString)
    socket.write("server:hello");//寫(xiě)入server:hello
  });
  socket.on("end", function () {
    console.log("客戶端關(guān)閉");
  });
});
server.on("err", function (err) {
  console.log(err);
});
server.listen(8080);//服務(wù)端監(jiān)聽(tīng)8080端口

2、client(客戶端) 實(shí)現(xiàn)

const net = require("net"); //net 模塊是 node自己封裝的tcp層
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); //  表示鏈接服務(wù)器本地8080端口
socket.on("connect", function (data) {
  //和服務(wù)器建立鏈接后
  socket.write("connect server");
});
socket.on("data", function (data) {
  //監(jiān)聽(tīng)數(shù)據(jù),讀取服務(wù)器傳來(lái)的數(shù)據(jù)
  console.log(data.toString());
  socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
  console.log(err);
});

3.題外話 如果想看tcp的三次握手和四次揮手 可以 通過(guò)我上述代碼 用wireshark(一個(gè)抓包工具)看實(shí)際過(guò)程

轉(zhuǎn)化流 transform過(guò)程

轉(zhuǎn)化流是雙工流的一種, 允許實(shí)現(xiàn)輸入,并在對(duì)數(shù)據(jù)執(zhí)行某些操作后返回輸出,兩者有依賴關(guān)系

  • 代碼過(guò)程(這個(gè)例子我的參考來(lái)處)

const stream = require('stream')
let c = 0;
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    let data = c < 26 ? Number(c++ + 97) : null;
    console.log('push', data);
    this.push( String.fromCharCode(data));
}
})

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);
  • 打印結(jié)果

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

可讀流的實(shí)現(xiàn)

跟著斷點(diǎn)先了解 可讀流的調(diào)用過(guò)程

就前面可讀流文件的讀取過(guò)程的代碼為例子 打斷點(diǎn)

rs.on('open')

rs.on('open')為斷點(diǎn)入口進(jìn)入

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

1、通過(guò)Stream.prototype.on.call 繼承Stream類

源文件位置:no dlib/_stream_readable.js(我是通過(guò)斷點(diǎn)點(diǎn)到這里 直接找,我也沒(méi)找到)

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

  • 再點(diǎn)進(jìn)去 發(fā)現(xiàn) Stream 是EventEmitter的子類 那么 可讀流也可以支持發(fā)布訂閱

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

2、監(jiān)聽(tīng)的事件類型是否是data和readable任意一個(gè) 不是 繼續(xù) 下一個(gè)事件的監(jiān)聽(tīng)

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

rs.on('data')

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

  • data的部分做兩件事

    1、判斷flowing(默認(rèn)值是null)不為false 就自動(dòng)resume方法執(zhí)行繼續(xù) 文件讀?。ㄟ@里我的案例是rs.pause();手動(dòng)將flowing 值為false了所以不會(huì)繼續(xù)調(diào)用)

    2、那如果我沒(méi)有調(diào)用rs.pause() 會(huì)繼續(xù)調(diào)用resume 看看resume里做了什么

Nodejs中的可讀流的作用和實(shí)現(xiàn)方法

2.1 最終調(diào)用了 stream.read()繼續(xù)讀取文件;直到文件讀取結(jié)束依次去emit end 和close事件

小結(jié):所以data默認(rèn)是會(huì)不斷的讀取文件直到文件讀取完畢 ,如果想要文件讀取變可控可以和我一樣用rs.pause()

自己實(shí)現(xiàn)

實(shí)現(xiàn)思路

繼承EventEmitter發(fā)布訂閱管理我們的事件

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {

}
module.exports = ReadStream;

數(shù)據(jù)初始化

constructor(path, options = {}) {
    super();
    //參考fs 寫(xiě)實(shí)例需要用到的參數(shù)
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;//默認(rèn)編碼格式是buffer
    this.autoClose = options.autoClose || true;//相當(dāng)于需要調(diào)用close方法,如果為false  文件讀取end的時(shí)候 就不會(huì)執(zhí)行 close
    this.start = options.start || 0;//數(shù)據(jù)讀取的開(kāi)始位置
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//默認(rèn)一次讀取64個(gè)字節(jié)的數(shù)據(jù) 
    this.offset = this.start;//fs.read的偏移量
    this.fd = undefined; //初始化fd 用于 open成功后的fd做賦值  供 read里使用
    this.flowing = false;//實(shí)現(xiàn)pause和resume備用,設(shè)置flag,當(dāng)監(jiān)聽(tīng)到data事件的時(shí)候 改 flowing為true,
    this.open(); //初始化的時(shí)候就要調(diào)用open
    this.on("readStreamListener", function (type) {
      // console.log(type)//這里打印就能看到 實(shí)例上所有 通過(guò)on 綁定的事件名稱
      if (type === "data") {
      //監(jiān)聽(tīng)到data事件的時(shí)候 改 flowing為true
        this.flowing = true;
        this.read();
      }
    });
    }

文件讀取方法read,pause,resume,open和destroy的實(shí)現(xiàn)

open()
 open() {
 // 調(diào)用fs.open 讀取目標(biāo)文件 
    fs.open(this.path, this.flags, (err, fd) => { 
      this.fd = fd; //賦值一個(gè)fd 供后面的 read()方式使用,文件讀取成功,fd是返回一個(gè)數(shù)字
      this.emit("open", fd);
    });
read()
 read() {
   // console.log("一開(kāi)始read里的", this.fd); //但是這樣依舊拿不到 open后的fd,用 發(fā)布訂閱 通過(guò)on來(lái)獲取 綁定的事件type
    //這里要做一個(gè)容錯(cuò)處理 ,因?yàn)閛pen是異步讀取文件,read里無(wú)法馬上拿到open結(jié)果
  if (typeof this.fd !== "number") {
      //訂閱open,給綁定一個(gè)回調(diào)事件read 直到this.fd有值
      return this.once("open", () => this.read());
    }
 }
  //fd打開(kāi)后 調(diào)用fs.read
  //實(shí)例上的start值是未知number,存在實(shí)際剩余的可讀的文件大小<highWaterMar的情況 ,用howMuchToRead 替換highWaterMark 去做fs.read的每次讀取buffer的大小
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
  //定義一個(gè)用戶 傳進(jìn)來(lái)的highWaterMark 大小的buffer對(duì)象
    const buffer = Buffer.alloc(this.highWaterMark);
       //讀取文件中的內(nèi)容fd給buffer 從0位置開(kāi)始,每次讀取howMuchToRead個(gè)。插入數(shù)據(jù),同時(shí)更新偏移量
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          // 每讀完一次,偏移量=已經(jīng)讀到的數(shù)量
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          //寫(xiě)到這里實(shí)例上的data 已經(jīng)可以打印出數(shù)據(jù)了 但是 繼續(xù)讀取 調(diào)用this.read() 直到bytesRead不存在 說(shuō)明數(shù)據(jù)讀取完畢了 走else
          //回調(diào) this.read();時(shí)候判斷 this.flowing 是否為true
          //pause調(diào)用后this.flowing將為false
          if (this.flowing) {
            this.read();
          }
        } else {
          // 執(zhí)行到這 bytesRead不存在說(shuō)明  文件數(shù)據(jù)讀取完畢了已經(jīng) 觸發(fā)end
          this.emit("end");//emit 實(shí)例上綁定的end事件
          //destroy 還沒(méi)寫(xiě)到 稍等 馬上后面就實(shí)現(xiàn)...
          this.destroy();
        }
      }
    );
resume()

文件讀取不去data事件,會(huì)觸發(fā)對(duì)應(yīng)的回調(diào),不停的觸發(fā) 所以想要變可控可以手動(dòng)調(diào)用 resume()& pause()

  • pause的實(shí)現(xiàn),調(diào)用的時(shí)候設(shè)置 this.flowing=false,打斷 read()

  pause() {
    this.flowing = false;
  }
pause()
  • pause 打斷 read()多次讀取,可以使用resume 打開(kāi) this.flowing=true 并調(diào)用read

resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
destroy()
  • 文件open不成功時(shí)候拋錯(cuò)時(shí)調(diào)用

  • 文件讀取完畢后&&this.autoClose===true ,read()里文件讀取end的時(shí)候 就執(zhí)行close

  destroy(err) {
    if (err) {
      this.emit("error");
    }
    // 把close放destroy里 并 在read里調(diào)用
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
完整代碼
  • 實(shí)現(xiàn)代碼

/**
 *實(shí)現(xiàn)簡(jiǎn)單的可讀流
 */

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    //參考fs 寫(xiě)實(shí)例需要用到的參數(shù)
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.fd = undefined;
    this.offset = this.start;
    this.flowing = false;
    this.open(); 
    this.on("newListener", function (type) {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destroy(err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
  pause() {
    this.flowing = false;
  }

  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
    const buffer = Buffer.alloc(this.highWaterMark);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.emit("end");
          this.destroy();
        }
      }
    );
  }
}

module.exports = ReadStream;
  • 調(diào)用代碼

const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",
  encoding: null, //默認(rèn)編碼格式是buffer
  autoClose: true, //相當(dāng)于需要調(diào)用close方法,如果為false  文件讀取end的時(shí)候 就不會(huì)執(zhí)行 close
  start: 0,
  highWaterMark: 3, //每次讀取的個(gè)數(shù) 默認(rèn)是64*1024個(gè)字節(jié)
});

到此,關(guān)于“Nodejs中的可讀流的作用和實(shí)現(xiàn)方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

網(wǎng)頁(yè)名稱:Nodejs中的可讀流的作用和實(shí)現(xiàn)方法
URL標(biāo)題:http://bm7419.com/article4/pcdioe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、云服務(wù)器、自適應(yīng)網(wǎng)站、網(wǎng)站內(nèi)鏈、面包屑導(dǎo)航標(biāo)簽優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)