diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 8cb60e381f..e75f5054ee 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -71,6 +71,7 @@ * [Process Step Jobs](patterns/process-step-jobs.md) * [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) * [Stop retrying jobs](patterns/stop-retrying-jobs.md) +* [Timeout](patterns/timeout.md) * [Redis Cluster](patterns/redis-cluster.md) ## BullMQ Pro diff --git a/docs/gitbook/patterns/timeout.md b/docs/gitbook/patterns/timeout.md new file mode 100644 index 0000000000..df9d40b29b --- /dev/null +++ b/docs/gitbook/patterns/timeout.md @@ -0,0 +1,73 @@ +# Timeout + +Sometimes, it is useful to timeout a processor function but you should be aware that async processes are not going to be cancelled immediately, even if you timeout a process, you need to validate that your process is in a cancelled state: + +```typescript +import { AbortController } from 'node-abort-controller'; + +enum Step { + Initial, + Second, + Finish, +} + +const worker = new Worker( + 'queueName', + async job => { + let { step, timeout } = job.data; + + const abortController = new AbortController(); + + const timeoutCall = setTimeout(() => { + abortController.abort(); + }, timeout); + abortController.signal.addEventListener( + 'abort', + () => clearTimeout(timeoutCall), + { once: true }, + ); + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await doInitialStepStuff(1000); + await job.updateData({ + step: Step.Second, + timeout, + }); + step = Step.Second; + break; + } + case Step.Second: { + await doSecondStepStuff(); + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await job.updateData({ + step: Step.Finish, + timeout, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + abortController.abort(); + } + }, + { connection }, +); +``` + +{% hint style="info" %} +It's better to split a long process into little functions/steps to be able to stop an execution by validating if we reach the timeout in each transition using an AbortController instance. +{% endhint %} + +## Read more: + +- 📋 [Process Step jobs](./process-step-jobs.md) +- 📋 [Cancellation by using Observables](../bullmq-pro/observables/cancelation.md) diff --git a/src/utils.ts b/src/utils.ts index cceecb438c..24534e2440 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -58,12 +58,11 @@ export function delay( return new Promise(resolve => { let timeout: ReturnType | undefined; const callback = () => { - abortController?.signal.removeEventListener('abort', callback); clearTimeout(timeout); resolve(); }; timeout = setTimeout(callback, ms); - abortController?.signal.addEventListener('abort', callback); + abortController?.signal.addEventListener('abort', callback, { once: true }); }); } diff --git a/tests/test_worker.ts b/tests/test_worker.ts index ded040f315..f79c8df6c1 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, times } from 'lodash'; import { describe, beforeEach, it, before, after as afterAll } from 'mocha'; +import { AbortController } from 'node-abort-controller'; import * as sinon from 'sinon'; import { v4 } from 'uuid'; import { @@ -3425,6 +3426,112 @@ describe('workers', function () { }); }); + describe('when using timeout pattern', () => { + it('should stop processing until checking aborted signal', async function () { + this.timeout(8000); + + enum Step { + Initial, + Second, + Finish, + } + + const worker = new Worker( + queueName, + async job => { + let { step, timeout } = job.data; + const abortController = new AbortController(); + + const timeoutCall = setTimeout(() => { + abortController.abort(); + }, timeout); + abortController.signal.addEventListener( + 'abort', + () => clearTimeout(timeoutCall), + { once: true }, + ); + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await delay(1000); + await job.updateData({ + step: Step.Second, + timeout, + }); + step = Step.Second; + break; + } + case Step.Second: { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await delay(1000); + await job.updateData({ + step: Step.Finish, + timeout, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + } + abortController.abort(); + }, + { connection, prefix }, + ); + + await worker.waitUntilReady(); + + let start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 3000 }); + + await new Promise(resolve => { + worker.once('completed', job => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Finish); + expect(elapse).to.be.greaterThan(2000); + expect(elapse).to.be.lessThan(2500); + resolve(); + }); + }); + + start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 1500 }); + + await new Promise(resolve => { + worker.once('completed', job => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Finish); + expect(elapse).to.be.greaterThan(2000); + expect(elapse).to.be.lessThan(2500); + resolve(); + }); + }); + + start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 500 }); + + await new Promise(resolve => { + worker.once('failed', (job, error) => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Second); + expect(error.message).to.be.equal('Timeout'); + expect(elapse).to.be.greaterThan(1000); + expect(elapse).to.be.lessThan(1500); + resolve(); + }); + }); + + await worker.close(); + }); + }); + it('should not retry a job if the custom backoff returns -1', async () => { let tries = 0;