Skip to content

Commit

Permalink
Merge pull request #4464 from omnivore-app/jacksonh/queue-cleanup
Browse files Browse the repository at this point in the history
Clean up queue processing
  • Loading branch information
jacksonh authored Oct 30, 2024
2 parents 818ba1a + 400e694 commit c5343b1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 48 deletions.
4 changes: 4 additions & 0 deletions packages/api/src/jobs/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ const uploadToBucket = async (

export const exportJob = async (jobData: ExportJobData) => {
const { userId, exportId } = jobData
logger.info('starting export job', {
userId,
exportId,
})

try {
const user = await findActiveUser(userId)
Expand Down
36 changes: 18 additions & 18 deletions packages/api/src/jobs/process-youtube-video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,24 +281,24 @@ export const processYouTubeVideo = async (
updatedLibraryItem.publishedAt = new Date(video.uploadDate)
}

if ('getTranscript' in video && duration > 0 && duration < 1801) {
// If the video has a transcript available, put a placehold in and
// enqueue a job to process the full transcript
const updatedContent = await addTranscriptToReadableContent(
libraryItem.originalUrl,
libraryItem.readableContent,
TRANSCRIPT_PLACEHOLDER_TEXT
)

if (updatedContent) {
updatedLibraryItem.readableContent = updatedContent
}

await enqueueProcessYouTubeTranscript({
videoId,
...jobData,
})
}
// if ('getTranscript' in video && duration > 0 && duration < 1801) {
// // If the video has a transcript available, put a placehold in and
// // enqueue a job to process the full transcript
// const updatedContent = await addTranscriptToReadableContent(
// libraryItem.originalUrl,
// libraryItem.readableContent,
// TRANSCRIPT_PLACEHOLDER_TEXT
// )

// if (updatedContent) {
// updatedLibraryItem.readableContent = updatedContent
// }

// await enqueueProcessYouTubeTranscript({
// videoId,
// ...jobData,
// })
// }

if (updatedLibraryItem !== {}) {
await updateLibraryItem(
Expand Down
60 changes: 30 additions & 30 deletions packages/api/src/queue-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,25 @@ export const createWorker = (connection: ConnectionOptions) =>
async (job: Job) => {
const executeJob = async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
// case 'refresh-all-feeds': {
// const queue = await getQueue()
// const counts = await queue?.getJobCounts('prioritized')
// if (counts && counts.wait > 1000) {
// return
// }
// return await refreshAllFeeds(appDataSource)
// }
// case 'refresh-feed': {
// return await refreshFeed(job.data)
// }
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
// case 'update-pdf-content': {
// return updatePDFContentJob(job.data)
// }
// case THUMBNAIL_JOB:
// return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
Expand All @@ -192,12 +192,12 @@ export const createWorker = (connection: ConnectionOptions) =>
return callWebhook(job.data)
case EXPORT_ITEM_JOB_NAME:
return exportItem(job.data)
case AI_SUMMARIZE_JOB_NAME:
return aiSummarize(job.data)
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
return processYouTubeVideo(job.data)
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
return processYouTubeTranscript(job.data)
// case AI_SUMMARIZE_JOB_NAME:
// return aiSummarize(job.data)
// case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
// return processYouTubeVideo(job.data)
// case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
// return processYouTubeTranscript(job.data)
case EXPORT_ALL_ITEMS_JOB_NAME:
return exportAllItems(job.data)
case SEND_EMAIL_JOB:
Expand All @@ -210,16 +210,16 @@ export const createWorker = (connection: ConnectionOptions) =>
return saveNewsletterJob(job.data)
case FORWARD_EMAIL_JOB:
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return createDigest(job.data)
// case CREATE_DIGEST_JOB:
// return createDigest(job.data)
case UPLOAD_CONTENT_JOB:
return uploadContentJob(job.data)
case UPDATE_HOME_JOB:
return updateHome(job.data)
case SCORE_LIBRARY_ITEM_JOB:
return scoreLibraryItem(job.data)
case GENERATE_PREVIEW_CONTENT_JOB:
return generatePreviewContent(job.data)
// case UPDATE_HOME_JOB:
// return updateHome(job.data)
// case SCORE_LIBRARY_ITEM_JOB:
// return scoreLibraryItem(job.data)
// case GENERATE_PREVIEW_CONTENT_JOB:
// return generatePreviewContent(job.data)
case PRUNE_TRASH_JOB:
return pruneTrashJob(job.data)
case EXPIRE_FOLDERS_JOB_NAME:
Expand Down

0 comments on commit c5343b1

Please sign in to comment.