最近接了一個需求,很簡單,就是起一個server,收到請求時調(diào)用某個提供好的接口,然后把結(jié)果返回。因為這個接口的性能問題,同時在請求的不能超過特定數(shù)目,要在服務(wù)中進(jìn)行限流。
限流的要求是,限制同時執(zhí)行的數(shù)目,超出這個數(shù)目后要在一個隊列中進(jìn)行緩存。
koa 中間件不調(diào)用 next
最初的想法是在 koa 中間件中進(jìn)行計數(shù),超過6個時將next函數(shù)緩存下來。等正在進(jìn)行中的任務(wù)結(jié)束時,調(diào)用next繼續(xù)其他請求。
之后發(fā)現(xiàn) koa 中間件中,不執(zhí)行next函數(shù)請求并不會停下,而是不再調(diào)用之后的中間件,直接返回內(nèi)容。
const Koa = require('koa'); const app = new Koa(); app.use((ctx, next) => { console.log('middleware 1'); setTimeout(() => { next(); }, 3000); ctx.body = 'hello'; }); app.use((ctx, next) => { console.log('middleware 2'); }); app.listen(8989);
以上代碼首先在控制臺打出 ‘middleware 1' => 瀏覽器收到 ‘hello' => 控制臺打出 ‘middleware 2'。
這里還有一個要注意的地方,就是一個請求已經(jīng)結(jié)束(finish)后,他的next方法還是可以繼續(xù)調(diào)用,之后的middleware還是繼續(xù)運行的(但是對ctx的修改不會生效,因為請求已經(jīng)返回了)。同樣,關(guān)閉的請求(close)也是同樣的表現(xiàn)。
使用 await 讓請求進(jìn)行等待
延遲next函數(shù)執(zhí)行不能達(dá)到目的。接下來自然想到的就是使用await讓當(dāng)前請求等待。await的函數(shù)返回一個Promise,我們將這個Promise中的resolve函數(shù)存儲到隊列中,延遲調(diào)用。
const Koa = require('koa'); const app = new Koa(); const queue = []; app.use(async (ctx, next) => { setTimeout(() => { queue.shift()(); }, 3000); await delay(); ctx.body = 'hello'; }); function delay() { return new Promise((resolve, reject) => { queue.push(resolve); }); } app.listen(8989);
上面這段代碼,在delay函數(shù)中返回一個Promise,Promise的resolve函數(shù)存入隊列中。設(shè)置定時3s后執(zhí)行隊列中的resolve函數(shù),使請求繼續(xù)執(zhí)行。
針對路由進(jìn)行限流,還是針對請求進(jìn)行限流?
限流的基本原理實現(xiàn)后,下面一個問題就是限流代碼該寫在哪里?基本上,有兩個位置:
針對接口進(jìn)行限流
由于我們的需求中,限流是因為要請求接口的性能有限。所以我們可以單獨針對這個請求進(jìn)行限流:
async function requestSomeApi() { // 如果已經(jīng)超過了最大并發(fā) if (counter > maxAllowedRequest) { await delay(); } counter++; const result = await request('http://some.api'); counter--; queue.shift()(); return result; }
下面還有一個方便復(fù)用的版本。
async function limitWrapper(func, maxAllowedRequest) { const queue = []; const counter = 0; return async function () { if (counter > maxAllowedRequest) { await new Promise((resolve, reject) => { queue.push(resolve); }); } counter++; const result = await func(); counter--; queue.shift()(); return result; } }
針對路由進(jìn)行限流
這種方式是寫一個koa中間件,在中間件中進(jìn)行限流:
async function limiter(ctx, next) => { // 如果超過了最大并發(fā)數(shù)目 if (counter >= maxAllowedRequest) { // 如果當(dāng)前隊列中已經(jīng)過長 await new Promise((resolve, reject) => { queue.push(resolve); }); } store.counter++; await next(); store.counter--; queue.shift()(); };
之后針對不同路由在router中使用這個中間件就好了:
router.use('/api', rateLimiter);
比較
實現(xiàn)了針對接口進(jìn)行限流,覺得邏輯有些亂,于是改用了針對路由進(jìn)行限流,一切運行的很完美。
直到我又接了個需求,是要請求三次這個接口返回三次請求的結(jié)果數(shù)組?,F(xiàn)在問題來了,我們不能直接調(diào)用接口,因為要限流。也不能直接調(diào)用請求接口的函數(shù)因為我們的限流是以路由為單位的。那怎么辦呢?我們只有請求這個路由了,自己請求自己。。
需要注意的地方
監(jiān)聽close事件,將請求從隊列中移出
已經(jīng)存儲在隊列中的請求,有可能遇到用戶取消的情況。前面說過koa中即使請求取消,之后的中間件還是會運行,也就是還會執(zhí)行需要限流的接口,造成浪費。
可以監(jiān)聽close事件來達(dá)到這個目的,每個請求我們需要用hash值來標(biāo)記:
ctx.res.on('close', () => { const index = queue.findIndex(item => item.hash === hash); if (index > -1) { queue.splice(index, 1); } });
設(shè)置超時時間
為了防止用戶等待過長時間,需要設(shè)置超時時間,這在koa中很容易實現(xiàn):
const server = app.listen(config.port); server.timeout = DEFAULT_TIMEOUT;
當(dāng)前隊列已經(jīng)過長
如果當(dāng)前隊列已經(jīng)過長了,即使加入隊列中也會超時。因此我們還需要處理隊列過長的情況:
if (queue.length > maxAllowedRequest) { ctx.body = 'error message'; return; }
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com