Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): remove repeat instance in favor of job-scheduler #2928

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
99c0161
fix(flow): fail parent on failure by default (#2682)
roggervalf Aug 14, 2024
bdd2387
Merge branch 'master' into v6
roggervalf Aug 17, 2024
55854d4
Merge branch 'master' into v6
roggervalf Aug 20, 2024
8542941
feat: add onChildFailure option (#2710)
roggervalf Aug 23, 2024
6e2bd15
fix(repeat): remove legacy cron option (#2729)
roggervalf Aug 24, 2024
dbef80d
perf: remove old marker logic (#2730)
roggervalf Sep 10, 2024
8eed71d
Merge branch 'master' into v6
roggervalf Sep 12, 2024
29e454a
fix(backoff): change optional BackoffStrategy params to use union (#2…
roggervalf Sep 17, 2024
8706b47
Merge branch 'master' into v6
roggervalf Sep 21, 2024
4ba7f6d
chore: merge branch 'master' into v6
roggervalf Sep 27, 2024
bae2024
chore: fix merge conflicts
roggervalf Oct 7, 2024
8617fb5
Merge branch 'master' into v6
roggervalf Oct 12, 2024
f7fa9ce
chore: merge branch 'master' into v6
roggervalf Oct 16, 2024
ea7462f
chore: fix merge conflicts
roggervalf Nov 13, 2024
eb1f9cb
chore: fix merge conflicts
roggervalf Nov 14, 2024
b5ebcb2
Merge branch 'master' into v6
roggervalf Nov 19, 2024
d304267
perf: keep jobs in waiting list when queue is paused (#2769)
roggervalf Nov 19, 2024
c7c827f
fix(worker): remove rateLimit method in favor of queue (#2921)
roggervalf Nov 22, 2024
3b451d2
fix(worker): remove blockingConnection option (#2922)
roggervalf Nov 22, 2024
10bf0fb
fix(job): remove debounce option in favor of deduplication (#2924)
roggervalf Nov 22, 2024
ea157a8
fix(queue): remove repeat instance in favor of job-scheduler
roggervalf Nov 23, 2024
b31863a
Merge branch 'master' into remove-repeat
roggervalf Nov 23, 2024
97e3a65
chore: remove repeat from exports
roggervalf Nov 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Since there are a few job queue solutions, here is a table comparing them:
| Group Support | ✓ | | | | | |
| Batches Support | ✓ | | | | | |
| Parent/Child Dependencies | ✓ | ✓ | | | | |
| Debouncing | ✓ | ✓ | ✓ | | | |
| Deduplication | ✓ | ✓ | ✓ | | | |
| Priorities | ✓ | ✓ | ✓ | ✓ | | ✓ |
| Concurrency | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| Delayed jobs | ✓ | ✓ | ✓ | ✓ | | ✓ |
Expand Down
3 changes: 2 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
- [Producers](guide/nestjs/producers.md)
- [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
- [Going to production](guide/going-to-production.md)
- [Migration to newer versions](guide/migration-to-newer-versions.md)
- [Migration to newer versions](guide/migrations/migration-to-newer-versions.md)
- [Version 6](guide/migrations/v6.md)
- [Troubleshooting](guide/troubleshooting.md)

## Patterns
Expand Down
6 changes: 3 additions & 3 deletions docs/gitbook/guide/flows/fail-parent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may need to fail a job when _one of its children_ fails.

The pattern to solve this requirement consists of using the **`failParentOnFailure`** option.
The pattern to solve this requirement consists of using the **`onChildFailure`** option as **fail**, this is also our default behavior.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,13 +16,13 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { failParentOnFailure: true },
opts: { onChildFailure: 'fail' },
children: [
{
name,
data: { idx: 1, foo: 'bah' },
queueName: 'grandChildrenQueueName',
opts: { failParentOnFailure: true },
opts: { onChildFailure: 'fail' },
},
{
name,
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/flows/ignore-dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may have a parent job and need to ignore when one of its children fail.

The pattern to solve this requirement consists on using the **ignoreDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.
The pattern to solve this requirement consists on using the **onChildFailure** option as **ignore**. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,7 +16,7 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { ignoreDependencyOnFailure: true },
opts: { onChildFailure: 'ignore' },
children: [
{
name,
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/flows/remove-dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

In some situations, you may have a parent job and need to remove the relationship when one of its children fail.

The pattern to solve this requirement consists on using the **removeDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.
The pattern to solve this requirement consists on using the **onChildFailure** option as **remove**. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.

```typescript
const flow = new FlowProducer({ connection });
Expand All @@ -16,7 +16,7 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { removeDependencyOnFailure: true },
opts: { onChildFailure: 'remove' },
children: [
{
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,3 @@ Since BullMQ supports global pause, one possible strategy, if suitable for your
### Use new queues altogether

This drastic solution involves discontinuing use of older queues and creating new ones. You could rename older queues (e.g., "myQueueV2"), use a new Redis host, or maintain two versions of the service—one running an older BullMQ version with old queues, and a newer one with the latest BullMQ and a different set of queues. When the older version has no more jobs to process, it can be retired, leaving only the upgraded version.

25 changes: 25 additions & 0 deletions docs/gitbook/guide/migrations/v6.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
description: Tips and hints on how to migrate to v6.
---

# Migration to v6

Make sure to call **runMigrations** method from Queue class in order to execute all necessary changes when coming from an older version.

## Migration of deprecated paused key

If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer).

Paused key is not longer needed as this state is already represented inside meta key. It also improves the process of pausing or resuming a queue as we don't need to rename any key.

## Remove legacy markers

When migrating from versions before v5.
It's recommended to do this process:

1. Pause your queues.
2. Upgrade to v6.
3. Instantiate a Queue instance and execute runMigrations method where migrations will be executed.
4. Resume your queues.

This way you will prevent that your workers pick a legacy marker that is no longer used because new markers are added in a different structure.
12 changes: 7 additions & 5 deletions docs/gitbook/guide/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ await queue.add('rate limited paint', { customerId: 'my-customer-id' });

Sometimes is useful to rate-limit a queue manually instead of based on some static options. For example, you may have an API that returns `429 Too Many Requests`, and you want to rate-limit the queue based on that response.

For this purpose, you can use the worker method **`rateLimit`** like this:
For this purpose, you can use the queue method **`rateLimit`** like this:

```typescript
import { Worker } from 'bullmq';
import { Queue, RateLimitError, Worker } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async () => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand Down Expand Up @@ -130,6 +132,6 @@ By removing rate limit key, workers will be able to pick jobs again and your rat

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl)
- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey)
10 changes: 6 additions & 4 deletions docs/gitbook/patterns/stop-retrying-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ await queue.add(
When a job is rate limited using `Worker.RateLimitError` and tried again, the `attempts` check is ignored, as rate limiting is not considered a real error. However, if you want to manually check the attempts and avoid retrying the job, you can do the following:

```typescript
import { Worker, UnrecoverableError } from 'bullmq';
import { Queue, RateLimitError, Worker, UnrecoverableError } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async job => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
if (job.attemptsMade >= job.opts.attempts) {
throw new UnrecoverableError('Unrecoverable');
}
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand All @@ -54,4 +56,4 @@ const worker = new Worker(

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
3 changes: 1 addition & 2 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@


optsDecodeMap = {
'fpof': 'failParentOnFailure',
'idof': 'ignoreDependencyOnFailure',
'ocf': 'onChildFailure',
'kl': 'keepLogs',
}

Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True):
"logs": result[0],
"count": result[1]
}

async def obliterate(self, force: bool = False):
"""
Completely destroys the queue and all of its contents irreversibly.
Expand Down
51 changes: 30 additions & 21 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
basePath = os.path.dirname(os.path.realpath(__file__))


onChildFailureMap = {
'fail': 'f',
'ignore': 'i',
'remove': 'r',
'wait': 'w'
}

class Scripts:

def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection):
Expand All @@ -31,11 +38,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
Expand All @@ -44,18 +51,19 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")),
"migrateDeprecatedPausedKey": self.redisClient.register_script(self.getScript("migrateDeprecatedPausedKey-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")),
Expand Down Expand Up @@ -93,6 +101,12 @@ def addJobArgs(self, job: Job, waiting_children_key: str|None):
jsonData = json.dumps(job.data, separators=(',', ':'), allow_nan=False)
packedOpts = msgpack.packb(job.opts)

parent = {}
parent.update(job.parent or {})
parent.update({
'ocf': onChildFailureMap.get(job.opts.get("ocf", ''), None)
})

parent = job.parent
parentKey = job.parentKey

Expand All @@ -118,7 +132,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
"""
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
keys = self.getKeys(['wait', 'meta', 'id',
'completed', 'active', 'events', 'marker'])
args = self.addJobArgs(job, None)
args.append(timestamp)
Expand Down Expand Up @@ -246,15 +260,15 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str):
return (keys, args)

def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys = self.getKeys(['active', 'wait', 'paused'])
keys = self.getKeys(['active', 'wait'])
keys.append(self.toKey(job_id))
keys.append(self.keys['meta'])
keys.append(self.keys['events'])
keys.append(self.keys['delayed'])
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])
keys.append(self.keys['stalled'])
keys.append(self.keys['marker'])

push_cmd = "RPUSH" if lifo else "LPUSH"

Expand Down Expand Up @@ -289,7 +303,6 @@ def promoteArgs(self, job_id: str):
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
keys.append(self.keys['meta'])

args = [self.keys[''], job_id]
Expand Down Expand Up @@ -361,7 +374,6 @@ async def isJobInList(self, list_key: str, job_id: str):

async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False):
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized'],
self.keys['active'],
Expand Down Expand Up @@ -395,7 +407,6 @@ async def reprocessJob(self, job: Job, state: str):
keys.append(self.keys[state])
keys.append(self.keys['wait'])
keys.append(self.keys['meta'])
keys.append(self.keys['paused'])
keys.append(self.keys['active'])
keys.append(self.keys['marker'])

Expand Down Expand Up @@ -437,7 +448,7 @@ async def obliterate(self, count: int, force: bool = False):

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker'])
['', 'events', state, 'wait', 'meta', 'active', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)
Expand Down Expand Up @@ -470,7 +481,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]:
limiter = opts.get("limiter", None)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker'])
'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker'])
packedOpts = msgpack.packb(
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True)
args = [self.keys[''], timestamp, packedOpts]
Expand Down Expand Up @@ -503,7 +514,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar
metricsKey = self.toKey('metrics:' + target)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target])
'stalled', 'limiter', 'delayed', 'meta', 'pc', target])
keys.append(self.toKey(job.id))
keys.append(metricsKey)
keys.append(self.keys['marker'])
Expand Down Expand Up @@ -537,8 +548,6 @@ def getMetricsSize(opts: dict):
"attempts": job.attempts,
"attemptsMade": job.attemptsMade,
"maxMetricsSize": getMetricsSize(opts),
"fpof": opts.get("failParentOnFailure", False),
"idof": opts.get("ignoreDependencyOnFailure", False)
}, use_bin_type=True)

args = [job.id, timestamp, propVal, transformed_value or "", target,
Expand Down Expand Up @@ -569,7 +578,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None

def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
keys = self.getKeys(['stalled', 'wait', 'active', 'failed',
'stalled-check', 'meta', 'paused', 'marker', 'events'])
'stalled-check', 'meta', 'marker', 'events'])
args = [maxStalledCount, self.keys[''], round(
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)
Expand Down
1 change: 0 additions & 1 deletion src/classes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export * from './queue-getters';
export * from './queue-keys';
export * from './queue';
export * from './redis-connection';
export * from './repeat';
export * from './sandbox';
export * from './scripts';
export * from './worker';
Loading
Loading