国产99久久精品_欧美日本韩国一区二区_激情小说综合网_欧美一级二级视频_午夜av电影_日本久久精品视频

最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
當前位置: 首頁 - 科技 - 知識百科 - 正文

Node.js pipe實現(xiàn)源碼解析

來源:懂視網(wǎng) 責編:小采 時間:2020-11-27 22:32:55
文檔

Node.js pipe實現(xiàn)源碼解析

Node.js pipe實現(xiàn)源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數(shù)據(jù)寫到 Writable,就必須先手動的將數(shù)據(jù)讀入內存,然后寫入 Writable。換句話說,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr
推薦度:
導讀Node.js pipe實現(xiàn)源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數(shù)據(jù)寫到 Writable,就必須先手動的將數(shù)據(jù)讀入內存,然后寫入 Writable。換句話說,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr

從前面兩篇文章,我們了解到。想要把 Readable 的數(shù)據(jù)寫到 Writable,就必須先手動的將數(shù)據(jù)讀入內存,然后寫入 Writable。換句話說,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼

readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優(yōu)雅的傳遞數(shù)據(jù)

readable.pipe(writable)

現(xiàn)在,就讓我們來看看它是如何實現(xiàn)的吧

pipe

首先需要先調用 Readable 的 pipe() 方法

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 記錄 Writable
 switch (state.pipesCount) {
 case 0:
 state.pipes = dest;
 break;
 case 1:
 state.pipes = [state.pipes, dest];
 break;
 default:
 state.pipes.push(dest);
 break;
 }
 state.pipesCount += 1;

 // ...

 src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保證 error 事件觸發(fā)時,onerror 首先被執(zhí)行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 觸發(fā) Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 將 Readable 改為 flow 模式
 if (!state.flowing) {
 debug('pipe resume');
 src.resume();
 }

 return dest;
};

執(zhí)行 pipe() 函數(shù)時,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件,最后如果 Readable 不是 flow 模式,就調用 resume() 將 Readable 改為 flow 模式

傳遞數(shù)據(jù)

Readable 從數(shù)據(jù)源獲取到數(shù)據(jù)后,觸發(fā) data 事件,執(zhí)行 ondata()

ondata() 相關代碼:

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 內調用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零,Readable 卡住的情況
 // 詳情見 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
 debug('ondata');
 increasedAwaitDrain = false;
 var ret = dest.write(chunk);
 if (false === ret && !increasedAwaitDrain) {
 // 防止在 dest.write() 內調用 src.unpipe(dest),導致 awaitDrain 不能清零,Readable 卡住的情況
 if (((state.pipesCount === 1 && state.pipes === dest) ||
 (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
 ) && 
 !cleanedUp) {
 debug('false write response, pause', src._readableState.awaitDrain);
 src._readableState.awaitDrain++;
 increasedAwaitDrain = true;
 }
 // 進入 pause 模式
 src.pause();
 }
 }

在 ondata(chunk) 函數(shù)內,通過 dest.write(chunk) 將數(shù)據(jù)寫入 Writable

此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加,不能清零,Readable 卡住

當不能再向 Writable 寫入數(shù)據(jù)時,Readable 會進入 pause 模式,直到所有的 drain 事件觸發(fā)

觸發(fā) drain 事件,執(zhí)行 ondrain()

// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
 return function() {
 var state = src._readableState;
 debug('pipeOnDrain', state.awaitDrain);
 if (state.awaitDrain)
 state.awaitDrain--;
 // awaitDrain === 0,且有 data 監(jiān)聽器
 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
 state.flowing = true;
 flow(src);
 }
 };
 }

每個 drain 事件觸發(fā)時,都會減少 awaitDrain,直到 awaitDrain 為 0。此時,調用 flow(src),使 Readable 進入 flow 模式

到這里,整個數(shù)據(jù)傳遞循環(huán)已經(jīng)建立,數(shù)據(jù)會順著循環(huán)源源不斷的流入 Writable,直到所有數(shù)據(jù)寫入完成

unpipe

不管寫入過程中是否出現(xiàn)錯誤,最后都會執(zhí)行 unpipe()

// lib/_stream_readable.js

// ...

 function unpipe() {
 debug('unpipe');
 src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也沒有
 if (state.pipesCount === 0)
 return this;

 // 只有一個
 if (state.pipesCount === 1) {
 if (dest && dest !== state.pipes)
 return this;
 // 沒有指定就 unpipe 所有
 if (!dest)
 dest = state.pipes;

 state.pipes = null;
 state.pipesCount = 0;
 state.flowing = false;
 if (dest)
 dest.emit('unpipe', this, unpipeInfo);
 return this;
 }

 // 沒有指定就 unpipe 所有
 if (!dest) {
 var dests = state.pipes;
 var len = state.pipesCount;
 state.pipes = null;
 state.pipesCount = 0;
 state.flowing = false;

 for (var i = 0; i < len; i++)
 dests[i].emit('unpipe', this, unpipeInfo);
 return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
 return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
 state.pipes = state.pipes[0];

 dest.emit('unpipe', this, unpipeInfo);

 return this;
};

Readable.prototype.unpipe() 函數(shù)會根據(jù) state.pipes 屬性和 dest 參數(shù),選擇執(zhí)行策略。最后會觸發(fā) dest 的 unpipe 事件

unpipe 事件觸發(fā)后,調用 onunpipe(),清理相關數(shù)據(jù)

// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
 debug('onunpipe');
 if (readable === src) {
 if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
 unpipeInfo.hasUnpiped = true;
 // 清理相關數(shù)據(jù)
 cleanup();
 }
 }
 }

End

在整個 pipe 的過程中,Readable 是主動方 ( 負責整個 pipe 過程:包括數(shù)據(jù)傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只需要觸發(fā) drain 事件 )

總結一下 pipe 的過程:

  • 首先執(zhí)行 readbable.pipe(writable),將 readable 與 writable 對接上
  • 當 readable 中有數(shù)據(jù)時,readable.emit('data'),將數(shù)據(jù)寫入 writable
  • 如果 writable.write(chunk) 返回 false,則進入 pause 模式,等待 drain 事件觸發(fā)
  • drain 事件全部觸發(fā)后,再次進入 flow 模式,寫入數(shù)據(jù)
  • 不管數(shù)據(jù)寫入完成或發(fā)生中斷,最后都會調用 unpipe()
  • unpipe() 調用 Readable.prototype.unpipe(),觸發(fā) dest 的 unpipe 事件,清理相關數(shù)據(jù)
  • 參考:

    https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

    https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

    聲明:本網(wǎng)頁內容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

    文檔

    Node.js pipe實現(xiàn)源碼解析

    Node.js pipe實現(xiàn)源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數(shù)據(jù)寫到 Writable,就必須先手動的將數(shù)據(jù)讀入內存,然后寫入 Writable。換句話說,每次傳遞數(shù)據(jù)時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr
    推薦度:
    • 熱門焦點

    最新推薦

    猜你喜歡

    熱門推薦

    專題
    Top
    主站蜘蛛池模板: 成人毛片国产a | 黄色毛片免费看 | 欧美精品第二页 | 国内精品在线视频 | 中文国产成人精品久久app | 三级网站免费播放国语 | 九九精品成人免费国产片 | 明星国产欧美日韩在线观看 | 国产 日韩 欧美在线 | 国产美女精品久久久久中文 | 日韩欧美亚洲综合 | 日本不卡一区二区三区 最新 | 久久人精品 | 亚洲第一页中文字幕 | 日本另类αv欧美另类aⅴ | 2021国产成人精品久久 | 国产精品一区二区三区四区五区 | 精品欧美一区二区三区在线 | 欧美人与禽交 | 在线观看视频一区二区三区 | 亚州色图欧美色图 | 亚欧在线| 可以免费观看的毛片 | 精品一区二区三区四区电影 | 香蕉视频你懂的 | 日本一区二区三区视频在线观看 | 日韩在线视频精品 | 久久精品无码一区二区日韩av | 亚欧美| 久久久久久综合一区中文字幕 | 欧美系列在线 | 九九精品视频一区在线 | 九九久久亚洲综合久久久 | 国内一级野外a一级毛片 | 91亚洲国产成人久久精品网址 | 亚洲va国产va欧美va综合 | 国产不卡视频在线 | 中文亚洲欧美日韩无线码 | 亚洲 自拍 另类 欧美 综合 | 亚洲一区二区三区精品视频 | 国产精品免费视频网站 |