Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions dash-spv/src/sync/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct FiltersManager<
/// Active batches being processed (keyed by start_height).
pub(super) active_batches: BTreeMap<u32, FiltersBatch>,
/// Height that has been committed to wallet (all blocks up to this height processed).
committed_height: u32,
pub(super) committed_height: u32,
/// Current block height being processed (for progress tracking).
processing_height: u32,
/// Blocks remaining that need to be processed.
Expand Down Expand Up @@ -132,10 +132,11 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
requests: &RequestSender,
) -> SyncResult<Vec<SyncEvent>> {
self.set_state(SyncState::Syncing);
// Get wallet state
let (wallet_birth_height, wallet_synced_height) = {
// Use filter_committed_height for restart recovery instead of
// synced_height, which advances per-block and may exceed committed scan progress.
let (wallet_birth_height, wallet_committed_height) = {
let wallet = self.wallet.read().await;
(wallet.earliest_required_height().await, wallet.synced_height())
(wallet.earliest_required_height().await, wallet.filter_committed_height())
};

// Get stored filters tip
Expand All @@ -147,8 +148,8 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet

// Calculate scan start (where we need to start processing)
// Must be at least header_start_height for checkpoint-based sync
let scan_start = if wallet_synced_height > 0 {
wallet_birth_height.max(wallet_synced_height + 1)
let scan_start = if wallet_committed_height > 0 {
wallet_birth_height.max(wallet_committed_height + 1)
} else {
wallet_birth_height
}
Expand All @@ -158,11 +159,11 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
if scan_start > self.progress.filter_header_tip_height() {
// Only emit FiltersSyncComplete if we've also reached the chain tip
// This prevents premature sync complete while filter headers are still syncing
if self.progress.current_height() >= self.progress.target_height() {
if self.committed_height >= self.progress.target_height() {
self.set_state(SyncState::Synced);
tracing::info!("Filters already synced to {}", self.progress.target_height());
return Ok(vec![SyncEvent::FiltersSyncComplete {
tip_height: self.progress.current_height(),
tip_height: self.committed_height,
}]);
}
// Caught up to available filter headers but chain tip not reached yet
Expand Down Expand Up @@ -422,15 +423,15 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
// updates (already Synced, signal BlocksManager that no more blocks are coming).
if self.active_batches.is_empty()
&& matches!(self.state(), SyncState::Syncing | SyncState::Synced)
&& self.progress.current_height() >= self.progress.filter_header_tip_height()
&& self.progress.current_height() >= self.progress.target_height()
&& self.committed_height >= self.progress.filter_header_tip_height()
&& self.committed_height >= self.progress.target_height()
{
if self.state() == SyncState::Syncing {
self.set_state(SyncState::Synced);
}
tracing::info!("Filter sync complete at height {}", self.progress.current_height());
tracing::info!("Filter sync complete at height {}", self.committed_height);
events.push(SyncEvent::FiltersSyncComplete {
tip_height: self.progress.current_height(),
tip_height: self.committed_height,
});
}

Expand Down Expand Up @@ -498,9 +499,12 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet

// Commit this batch
let batch = self.active_batches.remove(&batch_start).unwrap();
self.committed_height = batch.end_height();
self.wallet.write().await.update_synced_height(batch.end_height());
self.processing_height = batch.end_height() + 1;
let end = batch.end_height();
if end > self.committed_height {
self.committed_height = end;
self.wallet.write().await.update_filter_committed_height(end);
}
self.processing_height = end + 1;

tracing::info!(
"Committed batch {}-{}, committed_height now {}",
Expand Down
5 changes: 3 additions & 2 deletions dash-spv/src/sync/filters/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ impl<

async fn initialize(&mut self) -> SyncResult<()> {
let wallet = self.wallet.read().await;
let synced_height = wallet.synced_height();
let committed_height = wallet.filter_committed_height();
drop(wallet);

self.progress.update_current_height(synced_height);
self.committed_height = committed_height;
self.progress.update_current_height(committed_height);
self.set_state(SyncState::WaitingForConnections);

tracing::info!(
Expand Down
15 changes: 15 additions & 0 deletions key-wallet-manager/src/wallet_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ pub trait WalletInterface: Send + Sync + 'static {
/// Update the wallet's synced height. This also triggers balance updates.
fn update_synced_height(&mut self, height: CoreBlockHeight);

/// Return the height at which filter scanning was last committed.
/// Defaults to `synced_height()` for implementations that don't separate these concepts.
// TODO: This can probably somehow be combined with synced_height().
fn filter_committed_height(&self) -> CoreBlockHeight {
self.synced_height()
}

/// Update the filter committed height. Call when a height is fully processed
/// (including any rescans for newly discovered addresses).
fn update_filter_committed_height(&mut self, height: CoreBlockHeight) {
if height > self.synced_height() {
self.update_synced_height(height);
}
}

/// Provide a human-readable description of the wallet implementation.
///
/// Implementations are encouraged to include high-level state such as the
Expand Down
3 changes: 3 additions & 0 deletions key-wallet-manager/src/wallet_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct WalletManager<T: WalletInfoInterface = ManagedWalletInfo> {
network: Network,
/// Last fully processed block height.
synced_height: CoreBlockHeight,
/// Height at which filter scanning was last committed.
filter_committed_height: CoreBlockHeight,
/// Immutable wallets indexed by wallet ID
wallets: BTreeMap<WalletId, Wallet>,
/// Mutable wallet info indexed by wallet ID
Expand All @@ -95,6 +97,7 @@ impl<T: WalletInfoInterface> WalletManager<T> {
Self {
network,
synced_height: 0,
filter_committed_height: 0,
wallets: BTreeMap::new(),
wallet_infos: BTreeMap::new(),
#[cfg(feature = "std")]
Expand Down
11 changes: 11 additions & 0 deletions key-wallet-manager/src/wallet_manager/process_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ impl<T: WalletInfoInterface + Send + Sync + 'static> WalletInterface for WalletM
}
}

fn filter_committed_height(&self) -> CoreBlockHeight {
self.filter_committed_height
}

fn update_filter_committed_height(&mut self, height: CoreBlockHeight) {
self.filter_committed_height = height;
if height > self.synced_height {
self.update_synced_height(height);
}
}

async fn describe(&self) -> String {
let wallet_count = self.wallet_infos.len();
if wallet_count == 0 {
Expand Down
Loading