From b439963d22dde42299f721ba5359357bf197cb27 Mon Sep 17 00:00:00 2001 From: vicky Date: Thu, 1 Sep 2022 18:35:28 +0000 Subject: [PATCH 1/4] clear locks on init --- creator-node/src/index.ts | 9 ++++ creator-node/src/redis.js | 75 ++++++++++++++++------------- creator-node/test/redis.test.js | 84 ++++++++++++++++++++++++++++----- 3 files changed, 122 insertions(+), 46 deletions(-) diff --git a/creator-node/src/index.ts b/creator-node/src/index.ts index ac37dd26903..9e5a74ba6e5 100644 --- a/creator-node/src/index.ts +++ b/creator-node/src/index.ts @@ -1,4 +1,5 @@ 'use strict' + const { setupTracing } = require('./tracer') setupTracing('content-node') @@ -12,6 +13,7 @@ const { sequelize } = require('./models') const { runMigrations, clearRunningQueries } = require('./migrationManager') const { logger } = require('./logging') const { serviceRegistry } = require('./serviceRegistry') +const redisClient = require('./redis') const exitWithError = (...msg: any[]) => { logger.error('ERROR: ', ...msg) @@ -124,6 +126,13 @@ const startApp = async () => { // No need to await on this as this process can take a while and can run in the background serviceRegistry.initServicesThatRequireServer(appInfo.app) + // Clear all redis locks + try { + await redisClient.WalletWriteLock.clearWriteLocks() + } catch (e: any) { + logger.warn(`Could not clear write locks. Skipping..: ${e.message}`) + } + // when app terminates, close down any open DB connections gracefully ON_DEATH((signal: any, error: any) => { // NOTE: log messages emitted here may be swallowed up if using the bunyan CLI (used by diff --git a/creator-node/src/redis.js b/creator-node/src/redis.js index 1b309def872..7a2fadc1077 100644 --- a/creator-node/src/redis.js +++ b/creator-node/src/redis.js @@ -10,8 +10,42 @@ const asyncRetry = require('./utils/asyncRetry') const redisClient = new Redis(config.get('redisPort'), config.get('redisHost')) +const WRITE_WALLET_LOCK_PREFIX = 'WRITE.WALLET.' + +/** + * Deletes keys of a pattern: https://stackoverflow.com/a/36006360 + * @param keyPattern the redis key pattern that matches keys to remove + * @return {Number} numDeleted number of redis keys deleted + */ +const deleteAllKeysMatchingPattern = async function (keyPattern) { + // Create a readable stream (object mode) + const stream = redisClient.scanStream({ + match: keyPattern + }) + const deletedKeysSet = new Set() + return new Promise((resolve, reject) => { + stream.on('data', function (keys) { + // `keys` is an array of strings representing key names + if (keys.length) { + const pipeline = redisClient.pipeline() + keys.forEach(function (key) { + pipeline.del(key) + deletedKeysSet.add(key) + }) + pipeline.exec() + } + }) + stream.on('end', function () { + resolve(deletedKeysSet.size) + }) + stream.on('error', function (e) { + reject(e) + }) + }) +} + const _getWalletWriteLockKey = function (wallet) { - return `WRITE.WALLET.${wallet}` + return `${WRITE_WALLET_LOCK_PREFIX}${wallet}` } const WalletWriteLock = { @@ -116,41 +150,16 @@ const WalletWriteLock = { logger: genericLogger, log: false }) - } -} + }, -/** - * Deletes keys of a pattern: https://stackoverflow.com/a/36006360 - * @param keyPattern the redis key pattern that matches keys to remove - * @return {Number} numDeleted number of redis keys deleted - */ -const deleteAllKeysMatchingPattern = async function (keyPattern) { - // Create a readable stream (object mode) - const stream = redisClient.scanStream({ - match: keyPattern - }) - const deletedKeysSet = new Set() - return new Promise((resolve, reject) => { - stream.on('data', function (keys) { - // `keys` is an array of strings representing key names - if (keys.length) { - const pipeline = redisClient.pipeline() - keys.forEach(function (key) { - pipeline.del(key) - deletedKeysSet.add(key) - }) - pipeline.exec() - } - }) - stream.on('end', function () { - resolve(deletedKeysSet.size) - }) - stream.on('error', function (e) { - reject(e) - }) - }) + clearWriteLocks: async function () { + await redisClient.deleteAllKeysMatchingPattern( + `${WRITE_WALLET_LOCK_PREFIX}*` + ) + } } redisClient.deleteAllKeysMatchingPattern = deleteAllKeysMatchingPattern + module.exports = redisClient module.exports.WalletWriteLock = WalletWriteLock diff --git a/creator-node/test/redis.test.js b/creator-node/test/redis.test.js index b0f00b61786..ef9c5898377 100644 --- a/creator-node/test/redis.test.js +++ b/creator-node/test/redis.test.js @@ -20,35 +20,44 @@ describe('test Redis client', function () { assert.equal(await redis.get('key'), 'value') }) - it('Confirms user write locking works', async function() { + it('Confirms user write locking works', async function () { const wallet = 'wallet' - const defaultExpirationSec = WalletWriteLock.WALLET_WRITE_LOCK_EXPIRATION_SEC + const defaultExpirationSec = + WalletWriteLock.WALLET_WRITE_LOCK_EXPIRATION_SEC const validAcquirers = WalletWriteLock.VALID_ACQUIRERS // Confirm expected initial state - + assert.equal(await WalletWriteLock.isHeld(wallet), false) assert.equal(await WalletWriteLock.getCurrentHolder(wallet), null) // Acquire lock + confirm expected state - assert.doesNotReject(WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary)) + assert.doesNotReject( + WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary) + ) assert.equal(await WalletWriteLock.ttl(wallet), defaultExpirationSec) assert.equal(await WalletWriteLock.isHeld(wallet), true) - assert.equal(await WalletWriteLock.getCurrentHolder(wallet), validAcquirers.PrimarySyncFromSecondary) + assert.equal( + await WalletWriteLock.getCurrentHolder(wallet), + validAcquirers.PrimarySyncFromSecondary + ) assert.equal(await WalletWriteLock.syncIsInProgress(wallet), true) // Confirm acquisition fails when already held - assert.rejects(WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary), { - name: 'Error', - message: `[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.` - }) + assert.rejects( + WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary), + { + name: 'Error', + message: `[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.` + } + ) // Release lock + confirm expected state @@ -64,20 +73,69 @@ describe('test Redis client', function () { const expirationSec = 1 - assert.doesNotReject(WalletWriteLock.acquire(wallet, validAcquirers.SecondarySyncFromPrimary, expirationSec)) + assert.doesNotReject( + WalletWriteLock.acquire( + wallet, + validAcquirers.SecondarySyncFromPrimary, + expirationSec + ) + ) assert.equal(await WalletWriteLock.ttl(wallet), expirationSec) assert.equal(await WalletWriteLock.isHeld(wallet), true) - assert.equal(await WalletWriteLock.getCurrentHolder(wallet), validAcquirers.SecondarySyncFromPrimary) + assert.equal( + await WalletWriteLock.getCurrentHolder(wallet), + validAcquirers.SecondarySyncFromPrimary + ) assert.equal(await WalletWriteLock.syncIsInProgress(wallet), true) // Confirm lock auto-expired after expected expiration time (plus a small buffer) - + await utils.timeout(expirationSec * 1000 + 100) assert.equal(await WalletWriteLock.isHeld(wallet), false) }) -}) \ No newline at end of file + + it.only('Clears write locks', async function () { + const wallet1 = 'wallet1' + const wallet2 = 'wallet2' + const wallet3 = 'wallet3' + const wallet4 = 'wallet4' + + await WalletWriteLock.acquire( + wallet1, + WalletWriteLock.VALID_ACQUIRERS.PrimarySyncFromSecondary + ) + + await WalletWriteLock.acquire( + wallet2, + WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary + ) + + await WalletWriteLock.acquire( + wallet3, + WalletWriteLock.VALID_ACQUIRERS.PrimarySyncFromSecondary + ) + + await WalletWriteLock.acquire( + wallet4, + WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary + ) + + // Check that all the wallets have a sync lock + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet1), true) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet2), true) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet3), true) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet4), true) + + // Clear all the locks + await WalletWriteLock.clearWriteLocks() + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet1), false) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet2), false) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet3), false) + assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet4), false) + }) +}) From 1ee227d313ffb7e7cb9040fad8bac3d052ec5751 Mon Sep 17 00:00:00 2001 From: vicky Date: Thu, 1 Sep 2022 18:37:20 +0000 Subject: [PATCH 2/4] remove .only --- creator-node/test/redis.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/creator-node/test/redis.test.js b/creator-node/test/redis.test.js index ef9c5898377..14f041f3df9 100644 --- a/creator-node/test/redis.test.js +++ b/creator-node/test/redis.test.js @@ -99,7 +99,7 @@ describe('test Redis client', function () { assert.equal(await WalletWriteLock.isHeld(wallet), false) }) - it.only('Clears write locks', async function () { + it('Clears write locks', async function () { const wallet1 = 'wallet1' const wallet2 = 'wallet2' const wallet3 = 'wallet3' From b62a8c9c967ada71fde19f06d5f0a32b7a9b7910 Mon Sep 17 00:00:00 2001 From: vicky Date: Thu, 1 Sep 2022 18:39:22 +0000 Subject: [PATCH 3/4] move to end of file where it originally wwas --- creator-node/src/redis.js | 64 +++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/creator-node/src/redis.js b/creator-node/src/redis.js index 7a2fadc1077..897ef3c9509 100644 --- a/creator-node/src/redis.js +++ b/creator-node/src/redis.js @@ -12,38 +12,6 @@ const redisClient = new Redis(config.get('redisPort'), config.get('redisHost')) const WRITE_WALLET_LOCK_PREFIX = 'WRITE.WALLET.' -/** - * Deletes keys of a pattern: https://stackoverflow.com/a/36006360 - * @param keyPattern the redis key pattern that matches keys to remove - * @return {Number} numDeleted number of redis keys deleted - */ -const deleteAllKeysMatchingPattern = async function (keyPattern) { - // Create a readable stream (object mode) - const stream = redisClient.scanStream({ - match: keyPattern - }) - const deletedKeysSet = new Set() - return new Promise((resolve, reject) => { - stream.on('data', function (keys) { - // `keys` is an array of strings representing key names - if (keys.length) { - const pipeline = redisClient.pipeline() - keys.forEach(function (key) { - pipeline.del(key) - deletedKeysSet.add(key) - }) - pipeline.exec() - } - }) - stream.on('end', function () { - resolve(deletedKeysSet.size) - }) - stream.on('error', function (e) { - reject(e) - }) - }) -} - const _getWalletWriteLockKey = function (wallet) { return `${WRITE_WALLET_LOCK_PREFIX}${wallet}` } @@ -159,6 +127,38 @@ const WalletWriteLock = { } } +/** + * Deletes keys of a pattern: https://stackoverflow.com/a/36006360 + * @param keyPattern the redis key pattern that matches keys to remove + * @return {Number} numDeleted number of redis keys deleted + */ +const deleteAllKeysMatchingPattern = async function (keyPattern) { + // Create a readable stream (object mode) + const stream = redisClient.scanStream({ + match: keyPattern + }) + const deletedKeysSet = new Set() + return new Promise((resolve, reject) => { + stream.on('data', function (keys) { + // `keys` is an array of strings representing key names + if (keys.length) { + const pipeline = redisClient.pipeline() + keys.forEach(function (key) { + pipeline.del(key) + deletedKeysSet.add(key) + }) + pipeline.exec() + } + }) + stream.on('end', function () { + resolve(deletedKeysSet.size) + }) + stream.on('error', function (e) { + reject(e) + }) + }) +} + redisClient.deleteAllKeysMatchingPattern = deleteAllKeysMatchingPattern module.exports = redisClient From 2b2823b32b6407681f501dae352e99e3f6d76ccf Mon Sep 17 00:00:00 2001 From: Vicky Date: Thu, 1 Sep 2022 12:12:28 -0700 Subject: [PATCH 4/4] move clearing up --- creator-node/src/index.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/creator-node/src/index.ts b/creator-node/src/index.ts index 9e5a74ba6e5..57db5d354b4 100644 --- a/creator-node/src/index.ts +++ b/creator-node/src/index.ts @@ -119,13 +119,6 @@ const startApp = async () => { const appInfo = initializeApp(getPort(), serviceRegistry) logger.info('Initialized app and server') - // Initialize services that do not require the server, but do not need to be awaited. - serviceRegistry.initServicesAsynchronously() - - // Some Services cannot start until server is up. Start them now - // No need to await on this as this process can take a while and can run in the background - serviceRegistry.initServicesThatRequireServer(appInfo.app) - // Clear all redis locks try { await redisClient.WalletWriteLock.clearWriteLocks() @@ -133,6 +126,13 @@ const startApp = async () => { logger.warn(`Could not clear write locks. Skipping..: ${e.message}`) } + // Initialize services that do not require the server, but do not need to be awaited. + serviceRegistry.initServicesAsynchronously() + + // Some Services cannot start until server is up. Start them now + // No need to await on this as this process can take a while and can run in the background + serviceRegistry.initServicesThatRequireServer(appInfo.app) + // when app terminates, close down any open DB connections gracefully ON_DEATH((signal: any, error: any) => { // NOTE: log messages emitted here may be swallowed up if using the bunyan CLI (used by