From ca6e318fe7c9fec5d0fbb08d09d661186689452e Mon Sep 17 00:00:00 2001 From: Theo Ilie Date: Thu, 4 Aug 2022 20:18:21 +0000 Subject: [PATCH 1/2] Make syncs release locks when short circuiting --- .../services/sync/secondarySyncFromPrimary.js | 78 ++++++++++++------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index cb77fa37b39..5a3b625e7b5 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -9,6 +9,19 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator') const DBManager = require('../../dbManager') const UserSyncFailureCountManager = require('./UserSyncFailureCountManager') +const releaseRedisLocks = async (wallets, redis, logPrefix) => { + for (const wallet of wallets) { + try { + await redis.WalletWriteLock.release(wallet) + } catch (e) { + logger.warn( + logPrefix, + `Failure to release write lock for ${wallet} with error ${e.message}` + ) + } + } +} + const handleSyncFromPrimary = async ( serviceRegistry, walletPublicKeys, @@ -25,6 +38,7 @@ const handleSyncFromPrimary = async ( ) const start = Date.now() + let returnValue = {} logger.info('begin nodesync for ', walletPublicKeys, 'time', start) @@ -47,6 +61,8 @@ const handleSyncFromPrimary = async ( ), result: 'failure_sync_in_progress' } + } finally { + await releaseRedisLocks(walletPublicKeys, redis, logPrefix) } } @@ -102,10 +118,11 @@ const handleSyncFromPrimary = async ( `Failed to retrieve export from ${creatorNodeEndpoint} for wallets`, walletPublicKeys ) - return { + returnValue = { error: new Error(resp.data.error), result: 'failure_export_wallet' } + throw returnValue.error } // TODO - explain patch @@ -113,19 +130,21 @@ const handleSyncFromPrimary = async ( if (resp.request && resp.request.responseText) { resp.data = JSON.parse(resp.request.responseText) } else { - return { + returnValue = { error: new Error(`Malformed response from ${creatorNodeEndpoint}.`), result: 'failure_malformed_export' } + throw returnValue.error } } const { data: body } = resp if (!body.data.hasOwnProperty('cnodeUsers')) { - return { + returnValue = { error: new Error(`Malformed response from ${creatorNodeEndpoint}.`), result: 'failure_malformed_export' } + throw returnValue.error } logger.info( @@ -143,12 +162,13 @@ const handleSyncFromPrimary = async ( // Since different nodes may assign different cnodeUserUUIDs to a given walletPublicKey, // retrieve local cnodeUserUUID from fetched walletPublicKey and delete all associated data. if (!fetchedCNodeUser.hasOwnProperty('walletPublicKey')) { - return { + returnValue = { error: new Error( `Malformed response received from ${creatorNodeEndpoint}. "walletPublicKey" property not found on CNodeUser in response object` ), result: 'failure_malformed_export' } + throw returnValue.error } const fetchedWalletPublicKey = fetchedCNodeUser.walletPublicKey @@ -182,12 +202,13 @@ const handleSyncFromPrimary = async ( } if (!walletPublicKeys.includes(fetchedWalletPublicKey)) { - return { + returnValue = { error: new Error( `Malformed response from ${creatorNodeEndpoint}. Returned data for walletPublicKey that was not requested.` ), result: 'failure_malformed_export' } + throw returnValue.error } /** @@ -207,12 +228,13 @@ const handleSyncFromPrimary = async ( // Error if returned data is not within requested range if (fetchedLatestClockVal < localMaxClockVal) { - return { + returnValue = { error: new Error( `Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}` ), result: 'failure_inconsistent_clock' } + throw returnValue.error } else if (fetchedLatestClockVal === localMaxClockVal) { // Already up to date, no sync necessary logger.info( @@ -225,22 +247,24 @@ const handleSyncFromPrimary = async ( fetchedClockRecords[0] && fetchedClockRecords[0].clock !== localMaxClockVal + 1 ) { - return { + returnValue = { error: new Error( `Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}` ), result: 'failure_import_not_contiguous' } + throw returnValue.error } else if ( !_.isEmpty(fetchedCNodeUser.clockRecords) && maxClockRecordId !== fetchedLatestClockVal ) { - return { + returnValue = { error: new Error( `Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}` ), result: 'failure_import_not_consistent' } + throw returnValue.error } // All DB updates must happen in single atomic tx - partial state updates will lead to data loss @@ -302,12 +326,13 @@ const handleSyncFromPrimary = async ( // Error if update failed if (numRowsUpdated !== 1 || respObj.length !== 1) { - return { + returnValue = { error: new Error( `Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}` ), result: 'failure_db_transaction' } + throw returnValue.error } cnodeUser = respObj[0] } else { @@ -468,10 +493,11 @@ const handleSyncFromPrimary = async ( if (userSyncFailureCount < SyncRequestMaxUserFailureCountBeforeSkip) { const errorMsg = `User Sync failed due to ${numCIDsThatFailedSaveFileOp} failing saveFileForMultihashToFS op. userSyncFailureCount = ${userSyncFailureCount} // SyncRequestMaxUserFailureCountBeforeSkip = ${SyncRequestMaxUserFailureCountBeforeSkip}` logger.error(logPrefix, errorMsg) - return { + returnValue = { error: new Error(errorMsg), result: 'failure_skip_threshold_not_reached' } + throw returnValue.error // If max failure threshold reached, continue with sync and reset failure count } else { @@ -570,10 +596,12 @@ const handleSyncFromPrimary = async ( await DBManager.fixInconsistentUser(fetchedCNodeUser.cnodeUserUUID) - return { - error: new Error(e), - result: 'failure_db_transaction' - } + return _.isEmpty(returnValue) + ? { + error: new Error(e), + result: 'failure_db_transaction' + } + : returnValue } } } catch (e) { @@ -589,22 +617,14 @@ const handleSyncFromPrimary = async ( }. From endpoint ${creatorNodeEndpoint}.` ) - return { - error: new Error(e), - result: 'failure_sync_secondary_from_primary' - } + return _.isEmpty(returnValue) + ? { + error: new Error(e), + result: 'failure_sync_secondary_from_primary' + } + : returnValue } finally { - // Release all redis locks - for (const wallet of walletPublicKeys) { - try { - await redis.WalletWriteLock.release(wallet) - } catch (e) { - logger.warn( - logPrefix, - `Failure to release write lock for ${wallet} with error ${e.message}` - ) - } - } + await releaseRedisLocks(walletPublicKeys, redis, logPrefix) } logger.info( From 1f2a47cefc6d55a2e155822113d53feb2a1a328b Mon Sep 17 00:00:00 2001 From: Theo Ilie Date: Thu, 4 Aug 2022 21:19:57 +0000 Subject: [PATCH 2/2] Revert redis lock edge case release --- .../services/sync/secondarySyncFromPrimary.js | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index 5a3b625e7b5..82385610617 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -9,19 +9,6 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator') const DBManager = require('../../dbManager') const UserSyncFailureCountManager = require('./UserSyncFailureCountManager') -const releaseRedisLocks = async (wallets, redis, logPrefix) => { - for (const wallet of wallets) { - try { - await redis.WalletWriteLock.release(wallet) - } catch (e) { - logger.warn( - logPrefix, - `Failure to release write lock for ${wallet} with error ${e.message}` - ) - } - } -} - const handleSyncFromPrimary = async ( serviceRegistry, walletPublicKeys, @@ -61,8 +48,6 @@ const handleSyncFromPrimary = async ( ), result: 'failure_sync_in_progress' } - } finally { - await releaseRedisLocks(walletPublicKeys, redis, logPrefix) } } @@ -624,7 +609,17 @@ const handleSyncFromPrimary = async ( } : returnValue } finally { - await releaseRedisLocks(walletPublicKeys, redis, logPrefix) + // Release all redis locks + for (const wallet of walletPublicKeys) { + try { + await redis.WalletWriteLock.release(wallet) + } catch (e) { + logger.warn( + logPrefix, + `Failure to release write lock for ${wallet} with error ${e.message}` + ) + } + } } logger.info(