diff --git a/src/definitions.ts b/src/definitions.ts index 22431be..05f96f3 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -11,4 +11,5 @@ export type Options = { }; redis?: RedisOptions | Redis; redisEvents?: RedisOptions | Redis; + maxSizeOfJobData?: number /** in bytes */; }; diff --git a/src/helpers/roughSizeOfObject.ts b/src/helpers/roughSizeOfObject.ts new file mode 100644 index 0000000..46c85b0 --- /dev/null +++ b/src/helpers/roughSizeOfObject.ts @@ -0,0 +1,36 @@ +import { Options } from '../definitions'; + +function roughSizeOfObject(object: unknown): number { + const objectList = []; + const stack = [object]; + let bytes = 0; + + while (stack.length) { + const value = stack.pop(); + + if (typeof value === 'boolean') { + bytes += 4; + } else if (typeof value === 'string') { + bytes += value.length * 2; + } else if (typeof value === 'number') { + bytes += 8; + } else if (typeof value === 'object' && objectList.indexOf(value as never) === -1) { + objectList.push(value as never); + + for (const i in value) { + stack.push(value[i]); + } + } + } + return bytes; +} + +export const defaultMaxSizeOfObjectData = 10000; + +export function checkJobDataSize(opts: Options, data: unknown): void { + const allowableSize = opts?.maxSizeOfJobData || defaultMaxSizeOfObjectData; + const size = roughSizeOfObject(data); + if (size > allowableSize) { + throw new Error(`Job data ${size} exceeds allowable size ${allowableSize}`); + } +} diff --git a/src/mutation/jobAdd.ts b/src/mutation/jobAdd.ts index 1ed1c2e..2217845 100644 --- a/src/mutation/jobAdd.ts +++ b/src/mutation/jobAdd.ts @@ -3,6 +3,7 @@ import { getJobTC } from '../types/job/Job'; import { findQueue } from '../helpers'; import { Options } from '../definitions'; import { createJobDataITC } from '../types/job/JobInput'; +import { checkJobDataSize } from '../helpers/roughSizeOfObject'; export function createJobAddFC( sc: SchemaComposer, @@ -44,6 +45,7 @@ export function createJobAddFC( }), }, resolve: async (_, { prefix, queueName, jobName, data, options }) => { + checkJobDataSize(opts, data); const queue = await findQueue(prefix, queueName, opts); const job = await queue.add(jobName, data, options); return { diff --git a/src/mutation/jobAddBulk.ts b/src/mutation/jobAddBulk.ts index a8c154a..42a0631 100644 --- a/src/mutation/jobAddBulk.ts +++ b/src/mutation/jobAddBulk.ts @@ -2,6 +2,7 @@ import { findQueue } from '../helpers'; import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; import { getJobTC } from '../types/job/Job'; import { Options } from '../definitions'; +import { checkJobDataSize } from '../helpers/roughSizeOfObject'; export function createJobAddBulkFC( sc: SchemaComposer, @@ -46,6 +47,14 @@ export function createJobAddBulkFC( }).List, }, resolve: async (_, { prefix, queueName, jobs }) => { + if (Array.isArray(jobs)) { + for (const job of jobs) { + checkJobDataSize(opts, job); + } + } else { + throw new Error('jobAddBulk: jobs argument must be an array'); + } + const queue = await findQueue(prefix, queueName, opts); const jobsRes = await queue.addBulk(jobs); return { diff --git a/src/mutation/jobAddCron.ts b/src/mutation/jobAddCron.ts index 5f2fe1b..bc71389 100644 --- a/src/mutation/jobAddCron.ts +++ b/src/mutation/jobAddCron.ts @@ -2,6 +2,7 @@ import { findQueue } from '../helpers'; import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; import { getJobTC } from '../types/job/Job'; import { Options } from '../definitions'; +import { checkJobDataSize } from '../helpers/roughSizeOfObject'; export function createJobAddCronFC( sc: SchemaComposer, @@ -54,6 +55,7 @@ export function createJobAddCronFC( }), }, resolve: async (_, { prefix, queueName, jobName, data, options }) => { + checkJobDataSize(opts, data); const queue = await findQueue(prefix, queueName, opts); const job = await queue.add(jobName, data, options); return { diff --git a/src/mutation/jobAddEvery.ts b/src/mutation/jobAddEvery.ts index 62f4f6a..582f960 100644 --- a/src/mutation/jobAddEvery.ts +++ b/src/mutation/jobAddEvery.ts @@ -2,6 +2,7 @@ import { findQueue } from '../helpers'; import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; import { getJobTC } from '../types/job/Job'; import { Options } from '../definitions'; +import { checkJobDataSize } from '../helpers/roughSizeOfObject'; export function createJobAddEveryFC( sc: SchemaComposer, @@ -53,6 +54,7 @@ export function createJobAddEveryFC( }), }, resolve: async (_, { prefix, queueName, jobName, data, options }) => { + checkJobDataSize(opts, data); const queue = await findQueue(prefix, queueName, opts); const job = await queue.add(jobName, data, options); return { diff --git a/src/types/queue/Queue.durationAvg.ts b/src/types/queue/Queue.durationAvg.ts index d42de9b..605145b 100644 --- a/src/types/queue/Queue.durationAvg.ts +++ b/src/types/queue/Queue.durationAvg.ts @@ -8,7 +8,7 @@ export function createDurationAvgFC( opts: Options ): ObjectTypeComposerFieldConfigDefinition { return { - type: 'Int!', + type: 'String!', args: { limit: { type: 'Int', @@ -20,7 +20,7 @@ export function createDurationAvgFC( let amount = 0; let counter = 0; if (jobs.length === 0) { - return 0; + return '0'; } else { for (const job of jobs) { if (job?.finishedOn && job?.processedOn) { @@ -28,7 +28,7 @@ export function createDurationAvgFC( counter++; } } - return (amount / (counter || 1)).toFixed(0); + return String((amount / (counter || 1)).toFixed(0)); } }, };