Skip to content

Commit

Permalink
Added job options for repeat, and cron.
Browse files Browse the repository at this point in the history
* Added CLI functions to drain, pause, and unpause queues.
  • Loading branch information
rizen committed Apr 24, 2024
1 parent a770cd5 commit 7a1cad9
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 13 deletions.
3 changes: 2 additions & 1 deletion docs/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ outline: deep
* NOTE: Because of the above you may want to check out the new acceptedFileExtensions attribute in ving schemas and migrate your S3File integrations to use it.
* Implemented: add a display of an s3file thumbnail to the page generator #105
* Fixed: pulumi doesn't create the nodmods.zip file as it should #110
* Added a priority option to jobs.
* Added job options for priority, repeat, and cron.
* Added a job handler generator to the CLI.
* Added CLI functions to drain, pause, and unpause queues.

## 2024-04-23
* Created SelectInput component to replace FormSelect. However, you should use FormInput with type select instead of using this directly in most cases.
Expand Down
82 changes: 72 additions & 10 deletions ving/cli/jobs.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { defineCommand, showUsage } from "citty";
import { VingJobWorker } from '#ving/jobs/worker.mjs';
import { pause, resume, drain } from '#ving/jobs/queue.mjs';
import { generateJobHandler } from '#ving/generator/jobhandler.mjs';
import ving from '#ving/index.mjs';

Expand All @@ -9,19 +10,37 @@ export default defineCommand({
description: "Manage background jobs",
},
args: {
worker: {
type: "boolean",
description: "Start a worker",
alias: "w",
default: false,
},
queueName: {
type: "string",
description: "Set a queue name. Defaults to `jobs`.",
valueHint: 'jobs',
default: 'jobs',
alias: 'q',
},
drain: {
type: "boolean",
description: "Delete all waiting and delayed jobs in the queue.",
alias: "D",
default: false,
},
pause: {
type: "boolean",
description: "Stop workers from executing jobs in the queue.",
alias: "P",
default: false,
},
resume: {
type: "boolean",
description: "Resume workers executing jobs in the queue.",
alias: "R",
default: false,
},
worker: {
type: "boolean",
description: "Start a worker",
alias: "w",
default: false,
},
ttl: {
type: "number",
description: "How many seconds should the worker run? 0 means run indefinitely. Defaults to `0`.",
Expand All @@ -38,8 +57,32 @@ export default defineCommand({
type: "string",
description: "Specify a JSON string of data you'd like to pass into the job.",
default: '{}',
alias: 'j',
},
cron: {
type: "string",
description: "Specify a 5 parameter cron string for how often the new job should run.",
valueHint: '* * * * * ',
alias: 'c',
},
repeat: {
type: "number",
description: "Specify a number of milliseconds to wait before this job is executed again.",
valueHint: 30000,
alias: 'r',
},
delay: {
type: "number",
description: "The number of milliseconds to wait before executing this job.",
valueHint: 30000,
alias: 'd',
},
priority: {
type: "number",
description: "A number ranging from `1` to `2097152` where `1` is the highest possible priority. Defaults to `2097152`.",
valueHint: 100,
alias: 'p',
},
handler: {
type: 'string',
description: 'Generate a new job handler with the specified name.',
Expand All @@ -49,7 +92,19 @@ export default defineCommand({
},
async run({ args, cmd }) {
try {
if (args.worker) {
if (args.drain) {
await drain({ queueName: args.queueName });
ving.close();
}
else if (args.pause) {
await pause({ queueName: args.queueName });
ving.close();
}
else if (args.resume) {
await resume({ queueName: args.queueName });
ving.close();
}
else if (args.worker) {
const worker = new VingJobWorker(args.queueName);
await worker.start();
if (args.ttl > 0) {
Expand All @@ -63,9 +118,16 @@ export default defineCommand({
ving.close();
}
else if (args.addJob) {
await ving.addJob(args.addJob, JSON.parse(args.jobData), {
queueName: args.queueName,
});
const params = { queueName: args.queueName };
if (args.cron)
params.cron = args.cron;
if (args.repeat)
params.repeat = args.repeat;
if (args.priority)
params.priority = args.priority;
if (args.delay)
params.delay = args.delay;
await ving.addJob(args.addJob, JSON.parse(args.jobData), params);
if (!args.worker)
ving.close();
}
Expand Down
67 changes: 65 additions & 2 deletions ving/jobs/queue.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ import { Queue } from 'bullmq';
import ving from '#ving/index.mjs';
import { useRedis } from '#ving/redis.mjs';

/**
* Get BullMQ queue object.
* @param {Object} options An object with optional properties.
* @param {string} options.queueName The name of the queue. Defaults to `jobs`.
* @returns {Queue} A queue object.
* @example
* getQueue();
*/
export const getQueue = (options = {}) => {
const queue = new Queue(options?.queueName || 'jobs');
return queue;
}

/**
* Enqueues a job in the jobs system.
*
Expand All @@ -11,9 +24,13 @@ import { useRedis } from '#ving/redis.mjs';
* @param {string} options.queueName The name of the queue to add this job to. Defaults to `jobs`.
* @param {number} options.delay The number of milliseconds to wait before executing this job. Defaults to running as soon as possible.
* @param {number} options.priority A number ranging from `1` to `2097152` where `1` is the highest possible priority. Defaults to `2097152`.
* @param {number} options.repeat Specify a number of milliseconds to wait before this job is executed again. Leave blank for no repeat.
* @param {string} options.cron Specify a cron string for how often this job should be executed. Leave blank for no repeat.
* @example
* await addJob('Test', {foo:'bar'});
*/
export const addJob = async (type, data = {}, options = { queueName: 'jobs ' }) => {
const queue = new Queue(options?.queueName || 'jobs');
export const addJob = async (type, data = {}, options = { queueName: 'jobs' }) => {
const queue = getQueue(options);
const jobOptions = {
connection: useRedis(),
removeOnComplete: {
Expand All @@ -29,7 +46,53 @@ export const addJob = async (type, data = {}, options = { queueName: 'jobs ' })
jobOptions.delay = options?.delay;
if (options?.priority)
jobOptions.priority = options?.priority;
if (options?.repeat)
jobOptions.repeat = { every: options?.repeat };
if (options?.cron)
jobOptions.repeat = { cron: options?.cron };
const job = await queue.add(type, data, jobOptions);
ving.log('jobs').info(`Job ${job.id} ${job.name} enqueued.`);
await queue.close();
}

/**
* Stop workers from executing jobs in the queue.
* @param {Object} options An object with optional properties.
* @param {string} options.queueName The name of the queue. Defaults to `jobs`.
* @example
* await pause();
*/
export const pause = async (options = {}) => {
const queue = getQueue(options);
await queue.pause();
queue.disconnect();
ving.log('jobs').info(`Queue ${queue.name} paused.`);
}

/**
* Resume workers executing jobs in the queue. Undo `pause()`.
* @param {Object} options An object with optional properties.
* @param {string} options.queueName The name of the queue. Defaults to `jobs`.
* @example
* await resume();
*/
export const resume = async (options = {}) => {
const queue = getQueue(options);
await queue.resume();
queue.disconnect();
ving.log('jobs').info(`Queue ${queue.name} resumed.`);
}

/**
* Delete all waiting and delayed jobs in the queue.
* @param {Object} options An object with optional properties.
* @param {string} options.queueName The name of the queue. Defaults to `jobs`.
* @example
* await drain();
*/
export const drain = async (options = {}) => {
const queue = getQueue(options);
await queue.drain();
queue.disconnect();
ving.log('jobs').info(`Queue ${queue.name} drained.`);
}

0 comments on commit 7a1cad9

Please sign in to comment.