Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions creator-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict'

const { setupTracing } = require('./tracer')
setupTracing('content-node')

Expand All @@ -12,6 +13,7 @@ const { sequelize } = require('./models')
const { runMigrations, clearRunningQueries } = require('./migrationManager')
const { logger } = require('./logging')
const { serviceRegistry } = require('./serviceRegistry')
const redisClient = require('./redis')

const exitWithError = (...msg: any[]) => {
logger.error('ERROR: ', ...msg)
Expand Down Expand Up @@ -117,6 +119,13 @@ const startApp = async () => {
const appInfo = initializeApp(getPort(), serviceRegistry)
logger.info('Initialized app and server')

// Clear all redis locks
Comment thread
vicky-g marked this conversation as resolved.
try {
await redisClient.WalletWriteLock.clearWriteLocks()
} catch (e: any) {
logger.warn(`Could not clear write locks. Skipping..: ${e.message}`)
}

// Initialize services that do not require the server, but do not need to be awaited.
serviceRegistry.initServicesAsynchronously()

Expand Down
11 changes: 10 additions & 1 deletion creator-node/src/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ const asyncRetry = require('./utils/asyncRetry')

const redisClient = new Redis(config.get('redisPort'), config.get('redisHost'))

const WRITE_WALLET_LOCK_PREFIX = 'WRITE.WALLET.'

const _getWalletWriteLockKey = function (wallet) {
return `WRITE.WALLET.${wallet}`
return `${WRITE_WALLET_LOCK_PREFIX}${wallet}`
}

const WalletWriteLock = {
Expand Down Expand Up @@ -116,6 +118,12 @@ const WalletWriteLock = {
logger: genericLogger,
log: false
})
},

clearWriteLocks: async function () {
await redisClient.deleteAllKeysMatchingPattern(
`${WRITE_WALLET_LOCK_PREFIX}*`
)
}
}

Expand Down Expand Up @@ -152,5 +160,6 @@ const deleteAllKeysMatchingPattern = async function (keyPattern) {
}

redisClient.deleteAllKeysMatchingPattern = deleteAllKeysMatchingPattern

module.exports = redisClient
module.exports.WalletWriteLock = WalletWriteLock
84 changes: 71 additions & 13 deletions creator-node/test/redis.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,44 @@ describe('test Redis client', function () {
assert.equal(await redis.get('key'), 'value')
})

it('Confirms user write locking works', async function() {
it('Confirms user write locking works', async function () {
const wallet = 'wallet'
const defaultExpirationSec = WalletWriteLock.WALLET_WRITE_LOCK_EXPIRATION_SEC
const defaultExpirationSec =
WalletWriteLock.WALLET_WRITE_LOCK_EXPIRATION_SEC
const validAcquirers = WalletWriteLock.VALID_ACQUIRERS

// Confirm expected initial state

assert.equal(await WalletWriteLock.isHeld(wallet), false)

assert.equal(await WalletWriteLock.getCurrentHolder(wallet), null)

// Acquire lock + confirm expected state

assert.doesNotReject(WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary))
assert.doesNotReject(
WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary)
)

assert.equal(await WalletWriteLock.ttl(wallet), defaultExpirationSec)

assert.equal(await WalletWriteLock.isHeld(wallet), true)

assert.equal(await WalletWriteLock.getCurrentHolder(wallet), validAcquirers.PrimarySyncFromSecondary)
assert.equal(
await WalletWriteLock.getCurrentHolder(wallet),
validAcquirers.PrimarySyncFromSecondary
)

assert.equal(await WalletWriteLock.syncIsInProgress(wallet), true)

// Confirm acquisition fails when already held

assert.rejects(WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary), {
name: 'Error',
message: `[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.`
})
assert.rejects(
WalletWriteLock.acquire(wallet, validAcquirers.PrimarySyncFromSecondary),
{
name: 'Error',
message: `[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.`
}
)

// Release lock + confirm expected state

Expand All @@ -64,20 +73,69 @@ describe('test Redis client', function () {

const expirationSec = 1

assert.doesNotReject(WalletWriteLock.acquire(wallet, validAcquirers.SecondarySyncFromPrimary, expirationSec))
assert.doesNotReject(
WalletWriteLock.acquire(
wallet,
validAcquirers.SecondarySyncFromPrimary,
expirationSec
)
)

assert.equal(await WalletWriteLock.ttl(wallet), expirationSec)

assert.equal(await WalletWriteLock.isHeld(wallet), true)

assert.equal(await WalletWriteLock.getCurrentHolder(wallet), validAcquirers.SecondarySyncFromPrimary)
assert.equal(
await WalletWriteLock.getCurrentHolder(wallet),
validAcquirers.SecondarySyncFromPrimary
)

assert.equal(await WalletWriteLock.syncIsInProgress(wallet), true)

// Confirm lock auto-expired after expected expiration time (plus a small buffer)

await utils.timeout(expirationSec * 1000 + 100)

assert.equal(await WalletWriteLock.isHeld(wallet), false)
})
})

it('Clears write locks', async function () {
const wallet1 = 'wallet1'
const wallet2 = 'wallet2'
const wallet3 = 'wallet3'
const wallet4 = 'wallet4'

await WalletWriteLock.acquire(
wallet1,
WalletWriteLock.VALID_ACQUIRERS.PrimarySyncFromSecondary
)

await WalletWriteLock.acquire(
wallet2,
WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary
)

await WalletWriteLock.acquire(
wallet3,
WalletWriteLock.VALID_ACQUIRERS.PrimarySyncFromSecondary
)

await WalletWriteLock.acquire(
wallet4,
WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary
)

// Check that all the wallets have a sync lock
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet1), true)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet2), true)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet3), true)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet4), true)

// Clear all the locks
await WalletWriteLock.clearWriteLocks()
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet1), false)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet2), false)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet3), false)
assert.deepEqual(await WalletWriteLock.syncIsInProgress(wallet4), false)
})
})