node.js中stream流中可讀流和可寫流的實現(xiàn)與使用方法實例分析

本文實例講述了node.js中stream流中可讀流和可寫流的實現(xiàn)與使用方法。分享給大家供大家參考,具體如下:

公司主營業(yè)務(wù):網(wǎng)站設(shè)計制作、成都做網(wǎng)站、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。成都創(chuàng)新互聯(lián)公司是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。成都創(chuàng)新互聯(lián)公司推出甘孜州免費做網(wǎng)站回饋大家。

node.js中的流 stream 是處理流式數(shù)據(jù)的抽象接口。node.js 提供了很多流對象,像http中的request和response,和 process.stdout 都是流的實例。

流可以是 可讀的,可寫的,或是可讀可寫的。所有流都是 events 的實例。

一、流的類型

node.js中有四種基本流類型:

1、Writable 可寫流 (例:fs.createWriteStream() )

2、Readable 可讀流 (例:fs.createReadStream() )

3、Duplex 可讀又可寫流 (例:net.Socket )

4、Transform 讀寫過程中可修改或轉(zhuǎn)換數(shù)據(jù)的 Duplex 流 (例:zlib.createDeflate() )

二、流中的數(shù)據(jù)有兩種模式

1、二進制模式,都是 string字符串  和 Buffer。

2、對象模式,流內(nèi)部處理的是一系統(tǒng)普通對象。

三、可讀流的兩種模式

1、流動模式 ( flowing ) ,數(shù)據(jù)自動從系統(tǒng)底層讀取,并通過事件,盡可能快地提供給應(yīng)用程序。

2、暫停模式 ( paused ),必須顯式的調(diào)用 read() 讀取數(shù)據(jù)。

可讀流 都開始于暫停模式,可以通過如下方法切換到流動模式:

1、添加 'data' 事件回調(diào)。

2、調(diào)用 resume()。

3、調(diào)用 pipe()。

可讀流通過如下方法切換回暫停模式:

1、如果沒有管道目標,調(diào)用 pause()。

2、如果有管道目標,移除所有管道目標,調(diào)用 unpipe() 移除多個管道目標。

四、創(chuàng)建可讀流,并監(jiān)聽事件

const fs = require('fs');
//創(chuàng)建一個文件可讀流
let rs = fs.createReadStream('./1.txt', {
  //文件系統(tǒng)標志
  flags: 'r',
  //數(shù)據(jù)編碼,如果調(diào)置了該參數(shù),則讀取的數(shù)據(jù)會自動解析
  //如果沒調(diào)置,則讀取的數(shù)據(jù)會是 Buffer
  //也可以通過 rs.setEncoding() 進行設(shè)置
  encoding: 'utf8',
  //文件描述符,默認為null
  fd: null,
  //文件權(quán)限
  mode: 0o666,
  //文件讀取的開始位置
  start: 0,
  //文件讀取的結(jié)束位置(包括結(jié)束位置)
  end: Infinity,
  //讀取緩沖區(qū)的大小,默認64K
  highWaterMark: 3
});
//文件被打開時觸發(fā)
rs.on('open', function () {
  console.log('文件打開');
});
//監(jiān)聽data事件,會讓當前流切換到流動模式
//當流中將數(shù)據(jù)傳給消費者后觸發(fā)
//由于我們在上面配置了 highWaterMark 為 3字節(jié),所以下面會打印多次。
rs.on('data', function (data) {
  console.log(data);
});
//流中沒有數(shù)據(jù)可供消費者時觸發(fā)
rs.on('end', function () {
  console.log('數(shù)據(jù)讀取完畢');
});
//讀取數(shù)據(jù)出錯時觸發(fā)
rs.on('error', function () {
  console.log('讀取錯誤');
});
//當文件被關(guān)閉時觸發(fā)
rs.on('close', function () {
  console.log('文件關(guān)閉');
});

注意,'open' 和 'close' 事件并不是所有流都會觸發(fā)。

當們監(jiān)聽'data'事件后,系統(tǒng)會盡可能快的讀取出數(shù)據(jù)。但有時候,我們需要暫停一下流的讀取,操作其他事情。

這時候就需要用到 pause() 和 resume() 方法。

const fs = require('fs');
//創(chuàng)建一個文件可讀流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
rs.on('data', function (data) {
  console.log(`讀取了 ${data.length} 字節(jié)數(shù)據(jù) : ${data.toString()}`);
  //使流動模式的流停止觸發(fā)'data'事件,切換出流動模式,數(shù)據(jù)都會保留在內(nèi)部緩存中。
  rs.pause();
  //等待3秒后,再恢復觸發(fā)'data'事件,將流切換回流動模式。
  setTimeout(function () {
    rs.resume();
  }, 3000);
});

可讀流的 'readable' 事件,當流中有數(shù)據(jù)可供讀取時就觸發(fā)。

注意當監(jiān)聽 'readable' 事件后,會導致流停止流動,需調(diào)用 read() 方法讀取數(shù)據(jù)。

注意 on('data'),on('readable'),pipe() 不要混合使用,會導致不明確的行為。

const fs = require('fs');
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 1
});
//當流中有數(shù)據(jù)可供讀取時就觸發(fā)
rs.on('readable', function () {
  let data;
  //循環(huán)讀取數(shù)據(jù)
  //參數(shù)表示要讀取的字節(jié)數(shù)
  //如果可讀的數(shù)據(jù)不足字節(jié)數(shù),則返回緩沖區(qū)剩余數(shù)據(jù)
  //如是沒有指定字節(jié)數(shù),則返回緩沖區(qū)中所有數(shù)據(jù)
  while (data = rs.read()) {
    console.log(`讀取到 ${data.length} 字節(jié)數(shù)據(jù)`);
    console.log(data.toString());
  }
});

五、創(chuàng)建可寫流,并監(jiān)聽事件

const fs = require('fs');
//創(chuàng)建一個文件可寫流
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 3
});
//往流中寫入數(shù)據(jù)
//參數(shù)一表示要寫入的數(shù)據(jù)
//參數(shù)二表示編碼方式
//參數(shù)三表示寫入成功的回調(diào)
//緩沖區(qū)滿時返回false,未滿時返回true。
//由于上面我們設(shè)置的緩沖區(qū)大小為 3字節(jié),所以到寫入第3個時,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
function writeData() {
  let cnt = 9;
  return function () {
    let flag = true;
    while (cnt && flag) {
      flag = ws.write(`${cnt}`);
      console.log('緩沖區(qū)中寫入的字節(jié)數(shù)', ws.writableLength);
      cnt--;
    }
  };
}
let wd = writeData();
wd();
//當緩沖區(qū)中的數(shù)據(jù)滿的時候,應(yīng)停止寫入數(shù)據(jù),
//一旦緩沖區(qū)中的數(shù)據(jù)寫入文件了,并清空了,則會觸發(fā) 'drain' 事件,告訴生產(chǎn)者可以繼續(xù)寫數(shù)據(jù)了。
ws.on('drain', function () {
  console.log('可以繼續(xù)寫數(shù)據(jù)了');
  console.log('緩沖區(qū)中寫入的字節(jié)數(shù)', ws.writableLength);
  wd();
});
//當流或底層資源關(guān)閉時觸發(fā)
ws.on('close', function () {
  console.log('文件被關(guān)閉');
});
//當寫入數(shù)據(jù)出錯時觸發(fā)
ws.on('error', function () {
  console.log('寫入數(shù)據(jù)錯誤');
});

寫入流的 end() 方法 和 'finish' 事件監(jiān)聽

const fs = require('fs');
//創(chuàng)建一個文件可寫流
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 3
});
//往流中寫入數(shù)據(jù)
//參數(shù)一表示要寫入的數(shù)據(jù)
//參數(shù)二表示編碼方式
//參數(shù)三表示寫入成功的回調(diào)
//緩沖區(qū)滿時返回false,未滿時返回true。
//由于上面我們設(shè)置的緩沖區(qū)大小為 3字節(jié),所以到寫入第3個時,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
//調(diào)用end()表明已經(jīng)沒有數(shù)據(jù)要被寫入,在關(guān)閉流之前再寫一塊數(shù)據(jù)。
//如果傳入了回調(diào)函數(shù),則將作為 'finish' 事件的回調(diào)函數(shù)
ws.end('最后一點數(shù)據(jù)', 'utf8');
//調(diào)用 end() 且緩沖區(qū)數(shù)據(jù)都已傳給底層系統(tǒng)時觸發(fā)
ws.on('finish', function () {
  console.log('寫入完成');
});

寫入流的 cork() 和 uncork() 方法,主要是為了解決大量小塊數(shù)據(jù)寫入時,內(nèi)部緩沖可能失效,導致的性能下降。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 1
});
//調(diào)用 cork() 后,會強制把所有寫入的數(shù)據(jù)緩沖到內(nèi)存中。
//不會因為寫入的數(shù)據(jù)超過了 highWaterMark 的設(shè)置而寫入到文件中。
ws.cork();
ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3');
console.log(ws.writableLength);
//將調(diào)用 cork() 后的緩沖數(shù)據(jù)都輸出到目標,也就是寫入文件中。
ws.uncork();

注意 cork() 的調(diào)用次數(shù)要與 uncork() 一致。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 1
});
//調(diào)用一次 cork() 就應(yīng)該寫一次 uncork(),兩者要一一對應(yīng)。
ws.cork();
ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');
process.nextTick(function () {
  //注意這里只調(diào)用了一次 uncork()
  ws.uncork();
  //只有調(diào)用同樣次數(shù)的 uncork() 數(shù)據(jù)才會被輸出。
  ws.uncork();
});

六、可讀流的 pipe() 方法

pipe() 方法類似下面的代碼,在可讀流與可寫流之前架起一座橋梁。

const fs = require('fs');
//創(chuàng)建一個可讀流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
rs.on('data', function (data) {
  let flag = ws.write(data);
  console.log(`往可寫流中寫入 ${data.length} 字節(jié)數(shù)據(jù)`);
  //如果寫入緩沖區(qū)已滿,則暫??勺x流的讀取
  if (!flag) {
    rs.pause();
    console.log('暫停可讀流');
  }
});
//監(jiān)控可讀流數(shù)據(jù)是否讀完
rs.on('end', function () {
  console.log('數(shù)據(jù)已讀完');
  //如果可讀流讀完了,則調(diào)用 end() 表示可寫流已寫入完成
  ws.end();
});
//如果可寫流緩沖區(qū)已清空,可以再次寫入,則重新打開可讀流
ws.on('drain', function () {
  rs.resume();
  console.log('重新開啟可讀流');
});

我們用 pipe() 方法完成上面的功能。

const fs = require('fs');
//創(chuàng)建一個可讀流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
  highWaterMark: 3
});
//綁定可寫流到可讀流,自動將可讀流切換到流動模式,將可讀流的所有數(shù)據(jù)推送到可寫流。
rs.pipe(ws);
//可以綁定多個可寫流
rs.pipe(ws2);

我們也可以用 unpipe() 手動的解綁可寫流。

const fs = require('fs');
//創(chuàng)建一個可讀流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//創(chuàng)建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
  highWaterMark: 3
});
rs.pipe(ws);
rs.pipe(ws2);
//解綁可寫流,如果參數(shù)沒寫,則解綁所有管道
setTimeout(function () {
  rs.unpipe(ws2);
}, 0);

希望本文所述對大家node.js程序設(shè)計有所幫助。

網(wǎng)頁名稱:node.js中stream流中可讀流和可寫流的實現(xiàn)與使用方法實例分析
網(wǎng)頁網(wǎng)址:http://bm7419.com/article2/gegdoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、動態(tài)網(wǎng)站網(wǎng)站排名、軟件開發(fā)、網(wǎng)站策劃、手機網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

手機網(wǎng)站建設(shè)