diff --git a/creator-node/src/AsyncProcessingQueue.js b/creator-node/src/AsyncProcessingQueue.js index de1fbeb29dc..ffb21ad58ef 100644 --- a/creator-node/src/AsyncProcessingQueue.js +++ b/creator-node/src/AsyncProcessingQueue.js @@ -27,6 +27,8 @@ const PROCESS_STATES = Object.freeze({ FAILED: 'FAILED' }) +const ASYNC_PROCESSING_QUEUE_HISTORY = 500 + /** * This queue accepts jobs (any function) that needs to be processed asynchonously. * Once the job is complete, the response is added to redis. The response can be @@ -42,8 +44,8 @@ class AsyncProcessingQueue { port: config.get('redisPort') }, defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true + removeOnComplete: ASYNC_PROCESSING_QUEUE_HISTORY, + removeOnFail: ASYNC_PROCESSING_QUEUE_HISTORY } }) diff --git a/creator-node/src/ImageProcessingQueue.js b/creator-node/src/ImageProcessingQueue.js index 710e1864e4d..af8d5ce89b2 100644 --- a/creator-node/src/ImageProcessingQueue.js +++ b/creator-node/src/ImageProcessingQueue.js @@ -18,6 +18,8 @@ const MAX_CONCURRENCY = ? imageProcessingMaxConcurrency : os.cpus().length +const IMAGE_PROCESSING_QUEUE_HISTORY = 500 + class ImageProcessingQueue { constructor() { this.queue = new Bull('image-processing-queue', { @@ -26,8 +28,8 @@ class ImageProcessingQueue { host: config.get('redisHost') }, defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true + removeOnComplete: IMAGE_PROCESSING_QUEUE_HISTORY, + removeOnFail: IMAGE_PROCESSING_QUEUE_HISTORY } }) diff --git a/creator-node/src/TranscodingQueue.js b/creator-node/src/TranscodingQueue.js index f755893dcad..fb6fa07abdf 100644 --- a/creator-node/src/TranscodingQueue.js +++ b/creator-node/src/TranscodingQueue.js @@ -21,6 +21,8 @@ const PROCESS_NAMES = Object.freeze({ transcode320: 'transcode_320' }) +const TRANSCODING_QUEUE_HISTORY = 500 + class TranscodingQueue { constructor() { this.queue = new Bull('transcoding-queue', { @@ -29,8 +31,8 @@ class TranscodingQueue { host: config.get('redisHost') }, defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true + removeOnComplete: TRANSCODING_QUEUE_HISTORY, + removeOnFail: TRANSCODING_QUEUE_HISTORY } }) this.logStatus('Initialized TranscodingQueue') diff --git a/creator-node/src/monitors/MonitoringQueue.js b/creator-node/src/monitors/MonitoringQueue.js index 6f3610808f6..dd3c46dea5e 100644 --- a/creator-node/src/monitors/MonitoringQueue.js +++ b/creator-node/src/monitors/MonitoringQueue.js @@ -10,6 +10,8 @@ const PROCESS_NAMES = Object.freeze({ monitor: 'monitor' }) +const MONITORING_QUEUE_HISTORY = 500 + /** * A persistent cron-style queue that periodically monitors various * health metrics and caches values in redis. @@ -27,8 +29,8 @@ class MonitoringQueue { host: config.get('redisHost') }, defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true + removeOnComplete: MONITORING_QUEUE_HISTORY, + removeOnFail: MONITORING_QUEUE_HISTORY } }) diff --git a/creator-node/src/services/sync/skippedCIDsRetryService.js b/creator-node/src/services/sync/skippedCIDsRetryService.js index 2883221e600..3cad1b8caf2 100644 --- a/creator-node/src/services/sync/skippedCIDsRetryService.js +++ b/creator-node/src/services/sync/skippedCIDsRetryService.js @@ -7,6 +7,8 @@ const { saveFileForMultihashToFS } = require('../../fileManager') const LogPrefix = '[SkippedCIDsRetryQueue]' +const RETRY_QUEUE_HISTORY = 500 + /** * TODO - consider moving queue/jobs off main process. Will require re-factoring of job processing / dependencies */ @@ -23,8 +25,8 @@ class SkippedCIDsRetryQueue { }, defaultJobOptions: { // these required since completed/failed jobs data set can grow infinitely until memory exhaustion - removeOnComplete: true, - removeOnFail: true + removeOnComplete: RETRY_QUEUE_HISTORY, + removeOnFail: RETRY_QUEUE_HISTORY } }) diff --git a/creator-node/src/services/sync/syncQueue.js b/creator-node/src/services/sync/syncQueue.js index 4934d71a5d2..06fdd529548 100644 --- a/creator-node/src/services/sync/syncQueue.js +++ b/creator-node/src/services/sync/syncQueue.js @@ -3,6 +3,8 @@ const Bull = require('bull') const { logger } = require('../../logging') const secondarySyncFromPrimary = require('./secondarySyncFromPrimary') +const SYNC_QUEUE_HISTORY = 500 + /** * SyncQueue - handles enqueuing and processing of Sync jobs on secondary * sync job = this node (secondary) will sync data for a user from their primary @@ -24,8 +26,8 @@ class SyncQueue { port: this.nodeConfig.get('redisPort') }, defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true + removeOnComplete: SYNC_QUEUE_HISTORY, + removeOnFail: SYNC_QUEUE_HISTORY } })