From 981be9db23e5b4920dca8a412ee9f16a07f83582 Mon Sep 17 00:00:00 2001 From: alexmorten Date: Wed, 4 Mar 2020 10:55:57 +0100 Subject: [PATCH] fix requestFinished fired multiple times --- src/helper/concurrentLimitRequester.ts | 26 +++++++++++++++---- test/helper/concurrentLimitRequester.mocha.js | 22 +++++++++++++--- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/helper/concurrentLimitRequester.ts b/src/helper/concurrentLimitRequester.ts index a3affebb..af9e5f44 100644 --- a/src/helper/concurrentLimitRequester.ts +++ b/src/helper/concurrentLimitRequester.ts @@ -37,7 +37,6 @@ export function concurrentLimitRequesterFactory(parameters: ConcurrentLimitRe let requestQueue: QueueItem[] = []; let outstandingRequests: int = 0; - function requestFinished(): void { outstandingRequests--; if (!(requestQueue.length && outstandingRequests < concurrentLimit)) return; @@ -45,8 +44,11 @@ export function concurrentLimitRequesterFactory(parameters: ConcurrentLimitRe outstandingRequests++; const stream = requester(queueItem.request); - stream.on('error', requestFinished); - stream.on('end', requestFinished); + + const requestFinishedOnce = getOnceCallback(requestFinished); + stream.on('error', requestFinishedOnce); + stream.on('end', requestFinishedOnce); + pipeWithError(stream, queueItem.stream); } @@ -54,8 +56,11 @@ export function concurrentLimitRequesterFactory(parameters: ConcurrentLimitRe if (outstandingRequests < concurrentLimit) { outstandingRequests++; const stream = requester(request); - stream.on('error', requestFinished); - stream.on('end', requestFinished); + + const requestFinishedOnce = getOnceCallback(requestFinished); + stream.on('error', requestFinishedOnce); + stream.on('end', requestFinishedOnce); + return stream; } else { const stream = new PassThrough({ objectMode: true }); @@ -67,3 +72,14 @@ export function concurrentLimitRequesterFactory(parameters: ConcurrentLimitRe } }; } + +function getOnceCallback(callback: () => void) { + let called = false; + + return () => { + if (!called) { + called = true; + callback(); + } + }; +} diff --git a/test/helper/concurrentLimitRequester.mocha.js b/test/helper/concurrentLimitRequester.mocha.js index b0e2760c..9a070ffb 100644 --- a/test/helper/concurrentLimitRequester.mocha.js +++ b/test/helper/concurrentLimitRequester.mocha.js @@ -116,14 +116,20 @@ describe("Concurrent Limit Requester", () => { expect(e.message).to.equal('fail'); expect(nextQuery).to.equal('a'); nextQuery = 'b'; + expect(requester.hasQuery('c', 'has c')).to.equal(true); + expect(requester.hasQuery('d', 'has d')).to.equal(false); + + requester.reject('b'); }); let rb = toArray(concurrentLimitRequester({ query: 'b' })) .catch((e) => { expect(e.message).to.equal('fail'); expect(nextQuery).to.equal('b'); + + expect(requester.hasQuery('d', 'has d')).to.equal(true); + nextQuery = 'c'; - expect(requester.hasQuery('c', 'has c')).to.equal(true); requester.resolve('c'); }); @@ -131,14 +137,22 @@ describe("Concurrent Limit Requester", () => { .then((res) => { expect(res).to.deep.equal([1, 2, 3]); expect(nextQuery).to.equal('c'); + nextQuery = 'd'; + requester.resolve('d'); + }); + + let rd = toArray(concurrentLimitRequester({ query: 'd' })) + .then((res) => { + expect(res).to.deep.equal([1, 2, 3]); + expect(nextQuery).to.equal('d'); }); expect(requester.hasQuery('a'), 'has a').to.equal(true); expect(requester.hasQuery('b'), 'has b').to.equal(true); expect(requester.hasQuery('c'), 'has c').to.equal(false); + expect(requester.hasQuery('d'), 'has d').to.equal(false); requester.reject('a'); - requester.reject('b'); - return Promise.all([ra, rb, rc]); - }); + return Promise.all([ra, rb, rc, rd]); + }); });