diff --git a/lib/queue.js b/lib/queue.js index 2a2f6e2..66d4377 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -9,6 +9,7 @@ function Queue(concurrency) { this.processTimeout = 0; this.throttleCount = 0; this.throttleInterval = 1000; + this.throttleTimer = null; this.count = 0; this.tasks = []; this.waiting = []; @@ -64,7 +65,7 @@ Queue.prototype.add = function (item, factor = 0, priority = 0) { const task = [item, factor, priority]; const slot = this.count < this.concurrency; if (!this.paused && slot && this.onProcess) { - this.next(task); + this.execute(task); return this; } let tasks; @@ -92,6 +93,15 @@ Queue.prototype.add = function (item, factor = 0, priority = 0) { return this; }; +// Execute task +// task - , next task [item, priority] +// +// Returns: +Queue.prototype.execute = function (task) { + if (this.waitTimeout) setTimeout(() => this.next(task), this.waitTimeout); + else this.next(task); +}; + // Process next item // task - , next task [item, factor, priority] // @@ -143,8 +153,37 @@ Queue.prototype.takeNext = function () { } else { tasks = this.tasks; } - const task = tasks.shift(); - if (task) this.next(task); + if (this.throttleCount) { + let shouldStartTask = false; + + const startTask = () => { + const available = this.throttleCount - this.count; + if (available > 0) { + const allowed = Math.min(this.concurrency, available); + for (let i = 0; i < allowed; i++) { + const task = this.tasks.shift(); + if (task) { + this.execute(task); + } + } + } + }; + + const delayed = () => { + this.throttleTimer = null; + if (shouldStartTask) startTask(); + }; + + if (!this.throttleTimer) { + this.throttleTimer = setTimeout(delayed, this.throttleInterval); + shouldStartTask = false; + startTask(); + } + shouldStartTask = true; + } else { + const task = this.tasks.shift(); + this.execute(task); + } return this; };