From 65e16569a33974bf5071717557f6d5496b6b0c0e Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 14 Nov 2023 21:36:37 -0700 Subject: [PATCH 1/2] docs(pattern): add timeout section --- docs/gitbook/SUMMARY.md | 1 + docs/gitbook/patterns/timeout.md | 64 ++++++++++++++++++++++++++++++++ tests/test_worker.ts | 2 - 3 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 docs/gitbook/patterns/timeout.md diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 3b353011fe..6137239660 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -60,6 +60,7 @@ * [Process Step Jobs](patterns/process-step-jobs.md) * [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) * [Stop retrying jobs](patterns/stop-retrying-jobs.md) +* [Timeout](patterns/timeout.md) ## BullMQ Pro diff --git a/docs/gitbook/patterns/timeout.md b/docs/gitbook/patterns/timeout.md new file mode 100644 index 0000000000..e351b2d2ea --- /dev/null +++ b/docs/gitbook/patterns/timeout.md @@ -0,0 +1,64 @@ +# Timeout + +Sometimes, it is useful to timeout a processor function but you should be aware that async processes are not going to be cancelled immediately, even if you timeout a process, you need to validate that your process is in a cancelled state: + +```typescript +enum Step { + Initial, + Second, + Finish, +} + +const worker = new Worker( + 'queueName', + async job => { + let { step, timeout } = job.data; + let timeoutReached = false; + + setTimeout(() => { + timeoutReached = true; + }, timeout); + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + await doInitialStepStuff(1000); + if (timeoutReached) { + throw new Error('Timeout'); + } + await job.updateData({ + step: Step.Second, + timeout, + }); + step = Step.Second; + break; + } + case Step.Second: { + await doSecondStepStuff(); + if (timeoutReached) { + throw new Error('Timeout'); + } + await job.updateData({ + step: Step.Finish, + timeout, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + } + }, + { connection }, +); +``` + +{% hint style="info" %} +It's better to split a long process into little functions/steps to be able to stop an execution by validating if we reach the timeout in each transition. +{% endhint %} + +## Read more: + +- 📋 [Process Step jobs](./process-step-jobs.md) +- 📋 [Cancellation by using Observables](../bullmq-pro/observables/cancelation.md) \ No newline at end of file diff --git a/tests/test_worker.ts b/tests/test_worker.ts index d45a547160..d294a4c26a 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -2577,13 +2577,11 @@ describe('workers', function () { setTimeout(() => { timeoutReached = true; }, timeout); - console.log(step, timeoutReached, job.data.timeout); while (step !== Step.Finish) { switch (step) { case Step.Initial: { await delay(1000); if (timeoutReached) { - console.log('reaached1'); throw new Error('Timeout'); } await job.updateData({ From 26b3b891f777ab69324f7670d18904b8af2d6a18 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 12 Mar 2024 23:45:06 -0500 Subject: [PATCH 2/2] test: add timeout cases --- docs/gitbook/patterns/timeout.md | 25 +++++--- src/utils.ts | 3 +- tests/test_worker.ts | 107 +++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 10 deletions(-) diff --git a/docs/gitbook/patterns/timeout.md b/docs/gitbook/patterns/timeout.md index e351b2d2ea..df9d40b29b 100644 --- a/docs/gitbook/patterns/timeout.md +++ b/docs/gitbook/patterns/timeout.md @@ -3,6 +3,8 @@ Sometimes, it is useful to timeout a processor function but you should be aware that async processes are not going to be cancelled immediately, even if you timeout a process, you need to validate that your process is in a cancelled state: ```typescript +import { AbortController } from 'node-abort-controller'; + enum Step { Initial, Second, @@ -13,18 +15,24 @@ const worker = new Worker( 'queueName', async job => { let { step, timeout } = job.data; - let timeoutReached = false; - setTimeout(() => { - timeoutReached = true; + const abortController = new AbortController(); + + const timeoutCall = setTimeout(() => { + abortController.abort(); }, timeout); + abortController.signal.addEventListener( + 'abort', + () => clearTimeout(timeoutCall), + { once: true }, + ); while (step !== Step.Finish) { switch (step) { case Step.Initial: { - await doInitialStepStuff(1000); - if (timeoutReached) { + if (abortController.signal.aborted) { throw new Error('Timeout'); } + await doInitialStepStuff(1000); await job.updateData({ step: Step.Second, timeout, @@ -34,7 +42,7 @@ const worker = new Worker( } case Step.Second: { await doSecondStepStuff(); - if (timeoutReached) { + if (abortController.signal.aborted) { throw new Error('Timeout'); } await job.updateData({ @@ -48,6 +56,7 @@ const worker = new Worker( throw new Error('invalid step'); } } + abortController.abort(); } }, { connection }, @@ -55,10 +64,10 @@ const worker = new Worker( ``` {% hint style="info" %} -It's better to split a long process into little functions/steps to be able to stop an execution by validating if we reach the timeout in each transition. +It's better to split a long process into little functions/steps to be able to stop an execution by validating if we reach the timeout in each transition using an AbortController instance. {% endhint %} ## Read more: - 📋 [Process Step jobs](./process-step-jobs.md) -- 📋 [Cancellation by using Observables](../bullmq-pro/observables/cancelation.md) \ No newline at end of file +- 📋 [Cancellation by using Observables](../bullmq-pro/observables/cancelation.md) diff --git a/src/utils.ts b/src/utils.ts index cceecb438c..24534e2440 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -58,12 +58,11 @@ export function delay( return new Promise(resolve => { let timeout: ReturnType | undefined; const callback = () => { - abortController?.signal.removeEventListener('abort', callback); clearTimeout(timeout); resolve(); }; timeout = setTimeout(callback, ms); - abortController?.signal.addEventListener('abort', callback); + abortController?.signal.addEventListener('abort', callback, { once: true }); }); } diff --git a/tests/test_worker.ts b/tests/test_worker.ts index ded040f315..f79c8df6c1 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, times } from 'lodash'; import { describe, beforeEach, it, before, after as afterAll } from 'mocha'; +import { AbortController } from 'node-abort-controller'; import * as sinon from 'sinon'; import { v4 } from 'uuid'; import { @@ -3425,6 +3426,112 @@ describe('workers', function () { }); }); + describe('when using timeout pattern', () => { + it('should stop processing until checking aborted signal', async function () { + this.timeout(8000); + + enum Step { + Initial, + Second, + Finish, + } + + const worker = new Worker( + queueName, + async job => { + let { step, timeout } = job.data; + const abortController = new AbortController(); + + const timeoutCall = setTimeout(() => { + abortController.abort(); + }, timeout); + abortController.signal.addEventListener( + 'abort', + () => clearTimeout(timeoutCall), + { once: true }, + ); + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await delay(1000); + await job.updateData({ + step: Step.Second, + timeout, + }); + step = Step.Second; + break; + } + case Step.Second: { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + await delay(1000); + await job.updateData({ + step: Step.Finish, + timeout, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + } + abortController.abort(); + }, + { connection, prefix }, + ); + + await worker.waitUntilReady(); + + let start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 3000 }); + + await new Promise(resolve => { + worker.once('completed', job => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Finish); + expect(elapse).to.be.greaterThan(2000); + expect(elapse).to.be.lessThan(2500); + resolve(); + }); + }); + + start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 1500 }); + + await new Promise(resolve => { + worker.once('completed', job => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Finish); + expect(elapse).to.be.greaterThan(2000); + expect(elapse).to.be.lessThan(2500); + resolve(); + }); + }); + + start = Date.now(); + await queue.add('test', { step: Step.Initial, timeout: 500 }); + + await new Promise(resolve => { + worker.once('failed', (job, error) => { + const elapse = Date.now() - start; + expect(job?.data.step).to.be.equal(Step.Second); + expect(error.message).to.be.equal('Timeout'); + expect(elapse).to.be.greaterThan(1000); + expect(elapse).to.be.lessThan(1500); + resolve(); + }); + }); + + await worker.close(); + }); + }); + it('should not retry a job if the custom backoff returns -1', async () => { let tries = 0;