From b337b08d5a800d1199eeb6129bfbc145ea50a16b Mon Sep 17 00:00:00 2001 From: Sid Sethi Date: Mon, 19 Dec 2022 22:47:28 +0000 Subject: [PATCH 1/4] wip --- creator-node/package-lock.json | 2 +- creator-node/src/diskManager.ts | 77 +++++++++++-------- .../utils/cluster/clusterUtilsForPrimary.ts | 10 +++ 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/creator-node/package-lock.json b/creator-node/package-lock.json index 3c5fcb8f1dc..192fc8d7061 100644 --- a/creator-node/package-lock.json +++ b/creator-node/package-lock.json @@ -3549,7 +3549,7 @@ "@types/json5": { "version": "0.0.29", "resolved": "https://registry.npmjs.org/@types/json5/-/json5-0.0.29.tgz", - "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", + "integrity": "sha1-7ihweulOEdK4J7y+UnC86n8+ce4=", "dev": true }, "@types/lodash": { diff --git a/creator-node/src/diskManager.ts b/creator-node/src/diskManager.ts index f4b50e3aa9d..6997911889a 100644 --- a/creator-node/src/diskManager.ts +++ b/creator-node/src/diskManager.ts @@ -920,43 +920,54 @@ export async function migrateFilesWithNonStandardStoragePaths( prometheusRegistry: any, logger: Logger ): Promise { - // Reset gauges on each run so the metrics aren't infinitely increasing - _resetStoragePathMetrics(prometheusRegistry, logger) + if ( + !config.get('migrateFilesWithLegacyStoragePath') && + !config.get('migrateFilesWithCustomStoragePath') + ) { + return + } const BATCH_SIZE = 5_000 - // Legacy storagePaths - if (config.get('migrateFilesWithLegacyStoragePath')) { - try { - await _migrateNonDirFilesWithLegacyStoragePaths( - queryDelayMs, - BATCH_SIZE, - prometheusRegistry, - logger - ) - await _migrateDirsWithLegacyStoragePaths(queryDelayMs, BATCH_SIZE, logger) - } catch (e: any) { - logger.error(`Error migrating legacy storagePaths: ${e}`) + + while (true) { + // Reset gauges on each run so the metrics aren't infinitely increasing + _resetStoragePathMetrics(prometheusRegistry, logger) + + // Legacy storagePaths + if (config.get('migrateFilesWithLegacyStoragePath')) { + try { + await _migrateNonDirFilesWithLegacyStoragePaths( + queryDelayMs, + BATCH_SIZE, + prometheusRegistry, + logger + ) + await _migrateDirsWithLegacyStoragePaths( + queryDelayMs, + BATCH_SIZE, + logger + ) + } catch (e: any) { + logger.error(`Error migrating legacy storagePaths: ${e}`) + } } - } - // Custom storagePaths (not matching 'storagePath' env var) - if (config.get('migrateFilesWithCustomStoragePath')) { - try { - await _migrateFilesWithCustomStoragePaths( - queryDelayMs, - BATCH_SIZE, - prometheusRegistry, - logger - ) - } catch (e: any) { - logger.error(`Error migrating custom storagePaths: ${e}`) + // Custom storagePaths (not matching 'storagePath' env var) + if (config.get('migrateFilesWithCustomStoragePath')) { + try { + await _migrateFilesWithCustomStoragePaths( + queryDelayMs, + BATCH_SIZE, + prometheusRegistry, + logger + ) + } catch (e: any) { + logger.error(`Error migrating custom storagePaths: ${e}`) + } } - } - // Keep calling this function recursively without an await so the original function scope can close - return migrateFilesWithNonStandardStoragePaths( - 5000, - prometheusRegistry, - logger - ) + queryDelayMs = 5000 + + await timeout(queryDelayMs) + } } diff --git a/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts b/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts index b5a8460ca7b..a9331add25b 100644 --- a/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts +++ b/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts @@ -50,6 +50,11 @@ class ClusterUtilsForPrimary { prometheusRegistry: any, logger: Logger ) { + logger.debug( + `Creating msgToRecordMetric to all workers from ${JSON.stringify( + metricToRecord + )}` + ) const validatedMetricToRecord = promUtils().validateMetricToRecord(metricToRecord) // Non-cluster mode can just record the metric now @@ -64,6 +69,11 @@ class ClusterUtilsForPrimary { cmd: 'recordMetric', val: validatedMetricToRecord } + logger.debug( + `Sending msgToRecordMetric to all workers: ${JSON.stringify( + msgToRecordMetric + )}` + ) // Send out to all workers. Only the special worker will end up recording it for (const id in cluster.workers) { const worker = cluster.workers?.[id] From 2a3363bca6d86fd48920efc1c0a46c571312d535 Mon Sep 17 00:00:00 2001 From: Sid Sethi Date: Mon, 19 Dec 2022 22:52:26 +0000 Subject: [PATCH 2/4] wip2 --- creator-node/src/diskManager.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/creator-node/src/diskManager.ts b/creator-node/src/diskManager.ts index 6997911889a..d58ad00a462 100644 --- a/creator-node/src/diskManager.ts +++ b/creator-node/src/diskManager.ts @@ -425,8 +425,10 @@ export async function sweepSubdirectoriesInFiles( // keep calling this function recursively without an await so the original function scope can close // Only call again if backgroundDiskCleanupDeleteEnabled = true, to prevent re-processing infinitely - if (redoJob && config.get('backgroundDiskCleanupDeleteEnabled')) + if (redoJob && config.get('backgroundDiskCleanupDeleteEnabled')) { + await timeout(1000) return sweepSubdirectoriesInFiles() + } } /** From d825531e385aec0ced3e3dc929e2d632fe3cb196 Mon Sep 17 00:00:00 2001 From: Sid Sethi Date: Mon, 19 Dec 2022 23:02:42 +0000 Subject: [PATCH 3/4] wip --- creator-node/src/diskManager.ts | 171 ++++++++++++++++---------------- creator-node/src/index.ts | 18 ++-- 2 files changed, 96 insertions(+), 93 deletions(-) diff --git a/creator-node/src/diskManager.ts b/creator-node/src/diskManager.ts index d58ad00a462..99346e5d85b 100644 --- a/creator-node/src/diskManager.ts +++ b/creator-node/src/diskManager.ts @@ -334,100 +334,103 @@ export async function listNestedCIDsInFilePath( export async function sweepSubdirectoriesInFiles( redoJob = true ): Promise { - const subdirectories = await listSubdirectoriesInFiles() - if (!subdirectories) return - - for (let i = 0; i < subdirectories.length; i += 1) { - try { - const subdirectory = subdirectories[i] - - const cidsToFilePathMap = await listNestedCIDsInFilePath(subdirectory) - const cidsInSubdirectory = Object.keys(cidsToFilePathMap) + // infinite while loop with terminal conditions and a delay between iterations + while (true) { + const subdirectories = await listSubdirectoriesInFiles() + if (!subdirectories) return - if (cidsInSubdirectory.length === 0) { - continue - } + for (let i = 0; i < subdirectories.length; i += 1) { + try { + const subdirectory = subdirectories[i] - const queryResults = - // add a `query_success` row to the result so we know the query ran successfully - // shouldn't need this because sequelize should throw an error, but when deleting - // from disk paranoia is probably justified - ( - await models.sequelize.query( - `(SELECT "multihash" FROM "Files" WHERE "multihash" IN (:cidsInSubdirectory)) - UNION - (SELECT '${DB_QUERY_SUCCESS_CHECK_STR}');`, - { replacements: { cidsInSubdirectory } } - ) - )[0] - - genericLogger.debug( - `diskManager#sweepSubdirectoriesInFiles - iteration ${i} out of ${ - subdirectories.length - }. subdirectory: ${subdirectory}. got ${ - Object.keys(cidsToFilePathMap).length - } files in folder and ${ - queryResults.length - } results from db. files: ${Object.keys( - cidsToFilePathMap - ).toString()}. db records: ${JSON.stringify(queryResults)}` - ) + const cidsToFilePathMap = await listNestedCIDsInFilePath(subdirectory) + const cidsInSubdirectory = Object.keys(cidsToFilePathMap) - const cidsInDB = new Set() - let foundSuccessRow = false - for (const file of queryResults) { - if (file.multihash === `${DB_QUERY_SUCCESS_CHECK_STR}`) - foundSuccessRow = true - else cidsInDB.add(file.multihash) - } + if (cidsInSubdirectory.length === 0) { + continue + } - if (!foundSuccessRow) - throw new Error(`DB did not return expected success row`) - - const cidsToDelete = [] - const cidsNotToDelete = [] - // iterate through all files on disk and check if db contains it - for (const cid of cidsInSubdirectory) { - // if db doesn't contain file, log as okay to delete - if (!cidsInDB.has(cid)) { - cidsToDelete.push(cid) - } else cidsNotToDelete.push(cid) - } + const queryResults = + // add a `query_success` row to the result so we know the query ran successfully + // shouldn't need this because sequelize should throw an error, but when deleting + // from disk paranoia is probably justified + ( + await models.sequelize.query( + `(SELECT "multihash" FROM "Files" WHERE "multihash" IN (:cidsInSubdirectory)) + UNION + (SELECT '${DB_QUERY_SUCCESS_CHECK_STR}');`, + { replacements: { cidsInSubdirectory } } + ) + )[0] - if (cidsNotToDelete.length > 0) { genericLogger.debug( - `diskmanager.js - not safe to delete ${cidsNotToDelete.toString()}` + `diskManager#sweepSubdirectoriesInFiles - iteration ${i} out of ${ + subdirectories.length + }. subdirectory: ${subdirectory}. got ${ + Object.keys(cidsToFilePathMap).length + } files in folder and ${ + queryResults.length + } results from db. files: ${Object.keys( + cidsToFilePathMap + ).toString()}. db records: ${JSON.stringify(queryResults)}` ) - } - if (cidsToDelete.length > 0) { - genericLogger.info( - `diskmanager.js - safe to delete ${cidsToDelete.toString()}` - ) + const cidsInDB = new Set() + let foundSuccessRow = false + for (const file of queryResults) { + if (file.multihash === `${DB_QUERY_SUCCESS_CHECK_STR}`) + foundSuccessRow = true + else cidsInDB.add(file.multihash) + } + + if (!foundSuccessRow) + throw new Error(`DB did not return expected success row`) + + const cidsToDelete = [] + const cidsNotToDelete = [] + // iterate through all files on disk and check if db contains it + for (const cid of cidsInSubdirectory) { + // if db doesn't contain file, log as okay to delete + if (!cidsInDB.has(cid)) { + cidsToDelete.push(cid) + } else cidsNotToDelete.push(cid) + } - // gate deleting files on disk with the same env var - if (config.get('backgroundDiskCleanupDeleteEnabled')) { - await execShellCommand( - `rm ${cidsToDelete.map((cid) => cidsToFilePathMap[cid]).join(' ')}` + if (cidsNotToDelete.length > 0) { + genericLogger.debug( + `diskmanager.js - not safe to delete ${cidsNotToDelete.toString()}` ) } + + if (cidsToDelete.length > 0) { + genericLogger.info( + `diskmanager.js - safe to delete ${cidsToDelete.toString()}` + ) + + // gate deleting files on disk with the same env var + if (config.get('backgroundDiskCleanupDeleteEnabled')) { + await execShellCommand( + `rm ${cidsToDelete + .map((cid) => cidsToFilePathMap[cid]) + .join(' ')}` + ) + } + } + } catch (e: any) { + tracing.recordException(e) + genericLogger.error( + `diskManager#sweepSubdirectoriesInFiles - error: ${e}` + ) } - } catch (e: any) { - tracing.recordException(e) - genericLogger.error( - `diskManager#sweepSubdirectoriesInFiles - error: ${e}` - ) - } - // Wait 10sec between batches to reduce server load - await timeout(10000) - } + // Wait 10sec between batches to reduce server load + await timeout(10000) + } - // keep calling this function recursively without an await so the original function scope can close - // Only call again if backgroundDiskCleanupDeleteEnabled = true, to prevent re-processing infinitely - if (redoJob && config.get('backgroundDiskCleanupDeleteEnabled')) { + // keep calling this function recursively without an await so the original function scope can close + // Only call again if backgroundDiskCleanupDeleteEnabled = true, to prevent re-processing infinitely + if (!redoJob || !config.get('backgroundDiskCleanupDeleteEnabled')) return await timeout(1000) - return sweepSubdirectoriesInFiles() } } @@ -922,20 +925,14 @@ export async function migrateFilesWithNonStandardStoragePaths( prometheusRegistry: any, logger: Logger ): Promise { - if ( - !config.get('migrateFilesWithLegacyStoragePath') && - !config.get('migrateFilesWithCustomStoragePath') - ) { - return - } - const BATCH_SIZE = 5_000 + // infinite while loop with a delay between iterations while (true) { // Reset gauges on each run so the metrics aren't infinitely increasing _resetStoragePathMetrics(prometheusRegistry, logger) - // Legacy storagePaths + // Legacy storagePaths (//CID) if (config.get('migrateFilesWithLegacyStoragePath')) { try { await _migrateNonDirFilesWithLegacyStoragePaths( diff --git a/creator-node/src/index.ts b/creator-node/src/index.ts index 1e6f5989473..743480a7f16 100644 --- a/creator-node/src/index.ts +++ b/creator-node/src/index.ts @@ -131,12 +131,18 @@ const runAsyncBackgroundTasks = async () => { // eslint-disable-next-line @typescript-eslint/no-floating-promises sweepSubdirectoriesInFiles() } - // eslint-disable-next-line @typescript-eslint/no-floating-promises - migrateFilesWithNonStandardStoragePaths( - 500, - serviceRegistry.prometheusRegistry, - logger - ) + + if ( + config.get('migrateFilesWithLegacyStoragePath') || + config.get('migrateFilesWithCustomStoragePath') + ) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + migrateFilesWithNonStandardStoragePaths( + 500, + serviceRegistry.prometheusRegistry, + logger + ) + } } // The primary process performs one-time validation and spawns worker processes that each run the Express app From 3d8d81cf10cc82d273a4694873f269590511f352 Mon Sep 17 00:00:00 2001 From: Sid Sethi Date: Tue, 20 Dec 2022 19:22:38 +0000 Subject: [PATCH 4/4] cmplete --- .../src/utils/cluster/clusterUtilsForPrimary.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts b/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts index a9331add25b..b5a8460ca7b 100644 --- a/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts +++ b/creator-node/src/utils/cluster/clusterUtilsForPrimary.ts @@ -50,11 +50,6 @@ class ClusterUtilsForPrimary { prometheusRegistry: any, logger: Logger ) { - logger.debug( - `Creating msgToRecordMetric to all workers from ${JSON.stringify( - metricToRecord - )}` - ) const validatedMetricToRecord = promUtils().validateMetricToRecord(metricToRecord) // Non-cluster mode can just record the metric now @@ -69,11 +64,6 @@ class ClusterUtilsForPrimary { cmd: 'recordMetric', val: validatedMetricToRecord } - logger.debug( - `Sending msgToRecordMetric to all workers: ${JSON.stringify( - msgToRecordMetric - )}` - ) // Send out to all workers. Only the special worker will end up recording it for (const id in cluster.workers) { const worker = cluster.workers?.[id]