From 9375a667dcbfe8dbce09eef9d73f3506def5f3f4 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 12:16:59 +0100 Subject: [PATCH 01/23] fix(spv): exit manager task loop on network errors and signal shutdown before disconnect The tick branch in SyncManager::run() now breaks on SyncError::Network instead of logging indefinitely, preventing ~26s of channel-closed spam during shutdown. SyncCoordinator gains a signal_shutdown() method that cancels the shared token without awaiting joins, and DashSpvClient::stop() calls it before network.disconnect() so managers observe cancellation promptly. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Lukasz Klimek <842586+lklimek@users.noreply.github.com> --- dash-spv/src/client/lifecycle.rs | 4 + dash-spv/src/sync/sync_coordinator.rs | 10 +++ dash-spv/src/sync/sync_manager.rs | 122 ++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 1379972a3..e319d0f99 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -193,6 +193,10 @@ impl DashSpvClient SyncResult<()> { tracing::info!("Shutting down SyncCoordinator"); diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 7954a1d4f..4c54a46cd 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -309,6 +309,10 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } + Err(crate::error::SyncError::Network(ref msg)) => { + tracing::warn!("{} tick network error, exiting: {}", identifier, msg); + break; + } Err(e) => { tracing::error!("{} tick error: {}", identifier, e); } @@ -447,4 +451,122 @@ mod tests { // Verify tick was called multiple times assert!(tick_count.load(Ordering::Relaxed) > 0); } + + /// Mock manager whose tick() returns SyncError::Network after a threshold. + struct NetworkErrorManager { + identifier: ManagerIdentifier, + state: SyncState, + tick_count: Arc, + error_after: u32, + } + + impl std::fmt::Debug for NetworkErrorManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NetworkErrorManager") + .field("identifier", &self.identifier) + .finish() + } + } + + #[async_trait] + impl SyncManager for NetworkErrorManager { + fn identifier(&self) -> ManagerIdentifier { + self.identifier + } + + fn state(&self) -> SyncState { + self.state + } + + fn set_state(&mut self, state: SyncState) { + self.state = state; + } + + fn wanted_message_types(&self) -> &'static [MessageType] { + &[] + } + + async fn handle_message( + &mut self, + _msg: Message, + _requests: &RequestSender, + ) -> SyncResult> { + Ok(vec![]) + } + + async fn handle_sync_event( + &mut self, + _event: &SyncEvent, + _requests: &RequestSender, + ) -> SyncResult> { + Ok(vec![]) + } + + async fn tick( + &mut self, + _requests: &RequestSender, + ) -> SyncResult> { + let count = self.tick_count.fetch_add(1, Ordering::Relaxed); + if count >= self.error_after { + Err(crate::error::SyncError::Network("channel closed".into())) + } else { + Ok(vec![]) + } + } + + fn progress(&self) -> SyncManagerProgress { + let mut progress = BlockHeadersProgress::default(); + progress.set_state(self.state); + SyncManagerProgress::BlockHeaders(progress) + } + } + + /// Given a manager whose tick() returns SyncError::Network after a few calls, + /// When the task loop processes the error, + /// Then it exits promptly without requiring a shutdown signal. + #[tokio::test] + async fn test_manager_exits_on_tick_network_error() { + let tick_count = Arc::new(AtomicU32::new(0)); + + let manager = NetworkErrorManager { + identifier: ManagerIdentifier::BlockHeader, + state: SyncState::Initializing, + tick_count: tick_count.clone(), + error_after: 3, + }; + + // Create channels + let (_, message_receiver) = mpsc::unbounded_channel(); + let sync_event_sender = broadcast::Sender::::new(100); + let network_event_sender = broadcast::Sender::::new(100); + let (req_tx, _req_rx) = mpsc::unbounded_channel::(); + let requests = RequestSender::new(req_tx); + let shutdown = CancellationToken::new(); + let (progress_sender, _progress_rx) = watch::channel(manager.progress()); + + let context = SyncManagerTaskContext { + message_receiver, + sync_event_sender, + network_event_receiver: network_event_sender.subscribe(), + requests, + shutdown: shutdown.clone(), + progress_sender, + }; + + // Spawn the task — it should exit on its own when tick returns Network error + let handle = tokio::spawn(async move { manager.run(context).await }); + + // Wait for the task to complete with a timeout (should be fast) + let result = tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("task should exit promptly on network error") + .unwrap(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ManagerIdentifier::BlockHeader); + + // Verify tick was called at least error_after + 1 times + // (the error-producing call counts too) + assert!(tick_count.load(Ordering::Relaxed) > 3); + } } From d8bc066f1e767d058c394916c98699ac972eaba0 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 12:34:48 +0100 Subject: [PATCH 02/23] fix(dash-spv): shutdown token not checked when waiting for peer connection --- dash-spv/src/network/manager.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index d580459dd..802076479 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -245,7 +245,15 @@ impl PeerNetworkManager { tasks.spawn(async move { log::debug!("Attempting to connect to {}", addr); - match Peer::connect(addr, CONNECTION_TIMEOUT.as_secs(), network).await { + let connect_result = tokio::select! { + result = Peer::connect(addr, CONNECTION_TIMEOUT.as_secs(), network) => result, + _ = shutdown_token.cancelled() => { + log::debug!("Connection to {} cancelled by shutdown", addr); + return; + } + }; + + match connect_result { Ok(mut peer) => { // Perform handshake let mut handshake_manager = From cdccab6c7ed7757f251118188bb0ef680db2ca74 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 13:03:44 +0100 Subject: [PATCH 03/23] fix(spv): address review findings for network error handling - Emit SyncEvent::ManagerError before breaking on network tick errors so the coordinator has visibility into why a manager stopped - Clear pool connecting state when shutdown cancels a peer connection - Import SyncError and use short form instead of fully-qualified path - Document SyncError::Network variant's tick() exit semantics Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/error.rs | 7 ++++++- dash-spv/src/network/manager.rs | 1 + dash-spv/src/sync/sync_manager.rs | 10 +++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index eff214f6f..d314f99ee 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -210,7 +210,12 @@ pub enum SyncError { #[error("Timeout error: {0}")] Timeout(String), - /// Network-related errors (e.g., connection failures, protocol errors) + /// Network-related errors (e.g., connection failures, protocol errors). + /// + /// **Important:** When returned from [`SyncManager::tick()`], this variant causes the + /// manager task loop to exit immediately. Only return it from `tick()` for fatal, + /// unrecoverable conditions (e.g., the request channel is closed). For transient + /// network issues use [`SyncError::Timeout`] or handle them with internal retry logic. #[error("Network error: {0}")] Network(String), diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 802076479..94d3b3369 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -249,6 +249,7 @@ impl PeerNetworkManager { result = Peer::connect(addr, CONNECTION_TIMEOUT.as_secs(), network) => result, _ = shutdown_token.cancelled() => { log::debug!("Connection to {} cancelled by shutdown", addr); + pool.remove_peer(&addr).await; return; } }; diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 4c54a46cd..09f8861f6 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -1,4 +1,4 @@ -use crate::error::SyncResult; +use crate::error::{SyncError, SyncResult}; use crate::network::{Message, MessageType, NetworkEvent, RequestSender}; use crate::sync::{ BlockHeadersProgress, BlocksProgress, ChainLockProgress, FilterHeadersProgress, @@ -309,8 +309,12 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(crate::error::SyncError::Network(ref msg)) => { + Err(SyncError::Network(ref msg)) => { tracing::warn!("{} tick network error, exiting: {}", identifier, msg); + context.emit_sync_event(SyncEvent::ManagerError { + manager: identifier, + error: format!("Network error (exiting): {}", msg), + }); break; } Err(e) => { @@ -508,7 +512,7 @@ mod tests { ) -> SyncResult> { let count = self.tick_count.fetch_add(1, Ordering::Relaxed); if count >= self.error_after { - Err(crate::error::SyncError::Network("channel closed".into())) + Err(SyncError::Network("channel closed".into())) } else { Ok(vec![]) } From 0a6dff11d198fdd47ebbcc7a9f99142d0c2d8bd7 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 13:10:07 +0100 Subject: [PATCH 04/23] refactor(spv): replace signal_shutdown with full shutdown in stop() Await sync_coordinator.shutdown() in stop() to drain manager tasks before tearing down network and storage. Remove the now-unused signal_shutdown() method. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/client/lifecycle.rs | 8 +++++--- dash-spv/src/sync/sync_coordinator.rs | 10 ---------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index e319d0f99..5adf7e998 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -193,9 +193,11 @@ impl DashSpvClient SyncResult<()> { tracing::info!("Shutting down SyncCoordinator"); From 9f3970ccc623a1f23523ada1f0039cf9a14193ea Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 15:14:04 +0100 Subject: [PATCH 05/23] fix(spv): resolve deadlock in PeerNetworkManager shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PeerNetworkManager::shutdown() held self.tasks lock while draining the JoinSet. The maintenance loop task, being drained, called connect_to_peer() which also needed self.tasks lock — classic deadlock. Fix with two changes: - shutdown(): use std::mem::take to extract JoinSet from mutex before draining, so the lock is released immediately - connect_to_peer(): wrap lock acquisition with tokio::select! against shutdown token so it returns immediately during shutdown Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/network/manager.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 94d3b3369..c1f58afee 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -240,8 +240,14 @@ impl PeerNetworkManager { let message_dispatcher = self.message_dispatcher.clone(); let network_event_sender = self.network_event_sender.clone(); - // Spawn connection task - let mut tasks = self.tasks.lock().await; + // Spawn connection task — use select to avoid blocking on the lock during shutdown + let mut tasks = tokio::select! { + guard = self.tasks.lock() => guard, + _ = self.shutdown_token.cancelled() => { + self.pool.remove_peer(&addr).await; + return; + } + }; tasks.spawn(async move { log::debug!("Attempting to connect to {}", addr); @@ -1241,10 +1247,15 @@ impl PeerNetworkManager { log::warn!("Failed to save reputation data on shutdown: {}", e); } - // Wait for tasks to complete - let mut tasks = self.tasks.lock().await; + // Take tasks out of the mutex so we don't hold the lock while draining. + // This prevents a deadlock where a task (e.g. maintenance loop) tries to + // acquire self.tasks via connect_to_peer() while we hold the lock here. + let mut tasks = { + let mut guard = self.tasks.lock().await; + std::mem::take(&mut *guard) + }; while let Some(result) = tasks.join_next().await { - if let Err(e) = result { + if let Err(e) = &result { log::error!("Task join error: {}", e); } } From 86ebbab4abe4b51976e81aedf980b4bbd3a01324 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 15:49:20 +0100 Subject: [PATCH 06/23] doc: document logging --- dash-spv/CLAUDE.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dash-spv/CLAUDE.md b/dash-spv/CLAUDE.md index dacd9771e..97c490513 100644 --- a/dash-spv/CLAUDE.md +++ b/dash-spv/CLAUDE.md @@ -168,6 +168,9 @@ Use domain-specific error types: - **Built on**: `dashcore` library with Dash-specific features enabled - **Async runtime**: Tokio with full feature set +### Logging Convention +The project uses the `tracing` framework for structured logging. New code should use `tracing::` macros (`tracing::info!`, `tracing::debug!`, `tracing::warn!`, `tracing::error!`). + ## Key Implementation Details ### Storage Architecture From 85cf2bdc56ed12f9e50309f2255bfda66ad57539 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:06:41 +0100 Subject: [PATCH 07/23] feat(spv): surface FatalNetwork errors to API consumers Add SyncEvent::ManagerExited variant and set SyncState::Error on fatal network exits so consumers can detect when a manager stops permanently. Extract handle_fatal_exit helper to deduplicate the 4 FatalNetwork paths. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/client/events.rs | 7 ++ dash-spv/src/error.rs | 21 ++++-- dash-spv/src/network/manager.rs | 16 +++-- dash-spv/src/sync/events.rs | 25 ++++++- dash-spv/src/sync/sync_manager.rs | 116 ++++++++++++++++++++---------- 5 files changed, 133 insertions(+), 52 deletions(-) diff --git a/dash-spv/src/client/events.rs b/dash-spv/src/client/events.rs index 2c929fe47..c6753f506 100644 --- a/dash-spv/src/client/events.rs +++ b/dash-spv/src/client/events.rs @@ -26,6 +26,13 @@ impl DashSpvClient broadcast::Receiver { self.sync_coordinator.subscribe_events() } diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index d314f99ee..0eab0a001 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -212,13 +212,24 @@ pub enum SyncError { /// Network-related errors (e.g., connection failures, protocol errors). /// - /// **Important:** When returned from [`SyncManager::tick()`], this variant causes the - /// manager task loop to exit immediately. Only return it from `tick()` for fatal, - /// unrecoverable conditions (e.g., the request channel is closed). For transient - /// network issues use [`SyncError::Timeout`] or handle them with internal retry logic. + /// For transient network issues, use this variant or [`SyncError::Timeout`]. + /// For fatal, unrecoverable conditions (e.g., request channel closed), + /// use [`SyncError::FatalNetwork`] instead. #[error("Network error: {0}")] Network(String), + /// Fatal network condition that causes the manager task loop to exit immediately. + /// + /// Unlike [`SyncError::Network`], which is logged and allows the loop to continue, + /// this variant signals an unrecoverable failure (e.g., the request channel is closed) + /// and causes the manager to stop permanently. + /// + /// **Consumer responsibility:** When this error occurs, [`SyncEvent::ManagerExited`] + /// is emitted and [`SyncState::Error`] is set on the affected manager's progress. + /// The consumer must restart the SPV client to recover the affected sync phase. + #[error("Fatal network error (manager exiting): {0}")] + FatalNetwork(String), + /// Validation errors for data received during sync (e.g., invalid headers, invalid proofs) /// Use this for data validation errors, not state errors #[error("Validation error: {0}")] @@ -245,7 +256,7 @@ impl SyncError { SyncError::Timeout(_) => "timeout", SyncError::Validation(_) => "validation", SyncError::MissingDependency(_) => "dependency", - SyncError::Network(_) => "network", + SyncError::Network(_) | SyncError::FatalNetwork(_) => "network", SyncError::Storage(_) => "storage", SyncError::Headers2DecompressionFailed(_) => "headers2", SyncError::MasternodeSyncFailed(_) => "masternode", diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index c1f58afee..6c6db9088 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -249,12 +249,12 @@ impl PeerNetworkManager { } }; tasks.spawn(async move { - log::debug!("Attempting to connect to {}", addr); + tracing::debug!("Attempting to connect to {}", addr); let connect_result = tokio::select! { result = Peer::connect(addr, CONNECTION_TIMEOUT.as_secs(), network) => result, _ = shutdown_token.cancelled() => { - log::debug!("Connection to {} cancelled by shutdown", addr); + tracing::debug!("Connection to {} cancelled by shutdown", addr); pool.remove_peer(&addr).await; return; } @@ -267,11 +267,11 @@ impl PeerNetworkManager { HandshakeManager::new(network, mempool_strategy, user_agent); match handshake_manager.perform_handshake(&mut peer).await { Ok(_) => { - log::info!("Successfully connected to {}", addr); + tracing::info!("Successfully connected to {}", addr); // Request addresses from the peer for discovery if let Err(e) = peer.send_message(NetworkMessage::GetAddr).await { - log::warn!("Failed to send GetAddr to {}: {}", addr, e); + tracing::warn!("Failed to send GetAddr to {}: {}", addr, e); } // Record successful connection @@ -279,7 +279,7 @@ impl PeerNetworkManager { // Add to pool if let Err(e) = pool.add_peer(addr, peer).await { - log::error!("Failed to add peer to pool: {}", e); + tracing::error!("Failed to add peer to pool: {}", e); return; } @@ -799,6 +799,7 @@ impl PeerNetworkManager { if !this.pool.is_connected(addr).await && !this.pool.is_connecting(addr).await { + if this.shutdown_token.is_cancelled() { break; } log::info!("Reconnecting to exclusive peer: {}", addr); this.connect_to_peer(*addr).await; } @@ -818,6 +819,7 @@ impl PeerNetworkManager { if !this.pool.is_connected(&addr).await && !this.pool.is_connecting(&addr).await { + if this.shutdown_token.is_cancelled() { break; } this.connect_to_peer(addr).await; attempted += 1; if attempted >= needed { @@ -1255,8 +1257,8 @@ impl PeerNetworkManager { std::mem::take(&mut *guard) }; while let Some(result) = tasks.join_next().await { - if let Err(e) = &result { - log::error!("Task join error: {}", e); + if let Err(e) = result { + tracing::error!("Task join error: {}", e); } } diff --git a/dash-spv/src/sync/events.rs b/dash-spv/src/sync/events.rs index 11c9f66ac..cea468f8e 100644 --- a/dash-spv/src/sync/events.rs +++ b/dash-spv/src/sync/events.rs @@ -109,7 +109,9 @@ pub enum SyncEvent { height: u32, }, - /// A manager encountered a recoverable error. + /// A manager encountered a non-fatal error (logged, loop continues). + /// + /// For fatal errors that cause the manager to exit, see [`SyncEvent::ManagerExited`]. /// /// Emitted by: Any manager /// Consumed by: Coordinator (for logging/monitoring) @@ -120,6 +122,20 @@ pub enum SyncEvent { error: String, }, + /// A manager task has exited due to a fatal error and will not restart. + /// + /// This is a **critical event** that consumers MUST handle. The affected sync + /// phase has stopped permanently. The client should be restarted to recover. + /// + /// Emitted by: Any manager (on `SyncError::FatalNetwork`) + /// Consumer action required: Restart the SPV client or take corrective action. + ManagerExited { + /// Which manager exited + manager: ManagerIdentifier, + /// Reason for exit + error: String, + }, + /// ChainLock received and processed. /// /// Emitted by: `ChainLockManager` @@ -218,6 +234,13 @@ impl SyncEvent { } => { format!("ManagerError({}, {})", manager, error) } + SyncEvent::ManagerExited { + manager, + error, + .. + } => { + format!("ManagerExited({}, {})", manager, error) + } SyncEvent::ChainLockReceived { chain_lock, validated, diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 09f8861f6..e7c242a18 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -62,6 +62,30 @@ impl SyncManagerTaskContext { self.emit_sync_event(event); } } + + /// Handle a fatal network error: log, update progress to Error state, and + /// emit both `ManagerError` and `ManagerExited` events. + /// + /// The caller must call `set_state(SyncState::Error)` before this method + /// so that `progress` reflects the error state. + pub(super) fn handle_fatal_exit( + &self, + manager: ManagerIdentifier, + source: &str, + progress: SyncManagerProgress, + msg: &str, + ) { + tracing::warn!("{} {} fatal network error, exiting: {}", manager, source, msg); + self.progress_sender.send(progress).ok(); + self.emit_sync_event(SyncEvent::ManagerError { + manager, + error: format!("Fatal network error (exiting): {}", msg), + }); + self.emit_sync_event(SyncEvent::ManagerExited { + manager, + error: format!("Fatal network error: {}", msg), + }); + } } #[async_trait] @@ -235,6 +259,11 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } + Err(SyncError::FatalNetwork(ref msg)) => { + self.set_state(SyncState::Error); + context.handle_fatal_exit(identifier, "message handler", self.progress(), msg); + break; + } Err(e) => { tracing::error!("{} error handling message: {}", identifier, e); let error_event = SyncEvent::ManagerError { @@ -261,6 +290,11 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } + Err(SyncError::FatalNetwork(ref msg)) => { + self.set_state(SyncState::Error); + context.handle_fatal_exit(identifier, "sync event handler", self.progress(), msg); + break; + } Err(e) => { tracing::error!("{} error handling event: {}", identifier, e); } @@ -288,6 +322,11 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } + Err(SyncError::FatalNetwork(ref msg)) => { + self.set_state(SyncState::Error); + context.handle_fatal_exit(identifier, "network event handler", self.progress(), msg); + break; + } Err(e) => { tracing::error!("{} error handling network event: {}", identifier, e); } @@ -309,12 +348,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::Network(ref msg)) => { - tracing::warn!("{} tick network error, exiting: {}", identifier, msg); - context.emit_sync_event(SyncEvent::ManagerError { - manager: identifier, - error: format!("Network error (exiting): {}", msg), - }); + Err(SyncError::FatalNetwork(ref msg)) => { + self.set_state(SyncState::Error); + context.handle_fatal_exit(identifier, "tick", self.progress(), msg); break; } Err(e) => { @@ -361,18 +397,20 @@ mod tests { fn identifier(&self) -> ManagerIdentifier { self.identifier } - fn state(&self) -> SyncState { self.state } - fn set_state(&mut self, state: SyncState) { self.state = state; } - fn wanted_message_types(&self) -> &'static [MessageType] { &[] } + fn progress(&self) -> SyncManagerProgress { + let mut progress = BlockHeadersProgress::default(); + progress.set_state(self.state); + SyncManagerProgress::BlockHeaders(progress) + } async fn handle_message( &mut self, @@ -396,12 +434,6 @@ mod tests { self.tick_count.fetch_add(1, Ordering::Relaxed); Ok(vec![]) } - - fn progress(&self) -> SyncManagerProgress { - let mut progress = BlockHeadersProgress::default(); - progress.set_state(self.state); - SyncManagerProgress::BlockHeaders(progress) - } } #[tokio::test] @@ -456,7 +488,7 @@ mod tests { assert!(tick_count.load(Ordering::Relaxed) > 0); } - /// Mock manager whose tick() returns SyncError::Network after a threshold. + /// Mock manager whose tick() returns SyncError::FatalNetwork after a threshold. struct NetworkErrorManager { identifier: ManagerIdentifier, state: SyncState, @@ -466,9 +498,7 @@ mod tests { impl std::fmt::Debug for NetworkErrorManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NetworkErrorManager") - .field("identifier", &self.identifier) - .finish() + f.debug_struct("NetworkErrorManager").field("identifier", &self.identifier).finish() } } @@ -477,18 +507,20 @@ mod tests { fn identifier(&self) -> ManagerIdentifier { self.identifier } - fn state(&self) -> SyncState { self.state } - fn set_state(&mut self, state: SyncState) { self.state = state; } - fn wanted_message_types(&self) -> &'static [MessageType] { &[] } + fn progress(&self) -> SyncManagerProgress { + let mut progress = BlockHeadersProgress::default(); + progress.set_state(self.state); + SyncManagerProgress::BlockHeaders(progress) + } async fn handle_message( &mut self, @@ -506,26 +538,17 @@ mod tests { Ok(vec![]) } - async fn tick( - &mut self, - _requests: &RequestSender, - ) -> SyncResult> { + async fn tick(&mut self, _requests: &RequestSender) -> SyncResult> { let count = self.tick_count.fetch_add(1, Ordering::Relaxed); if count >= self.error_after { - Err(SyncError::Network("channel closed".into())) + Err(SyncError::FatalNetwork("channel closed".into())) } else { Ok(vec![]) } } - - fn progress(&self) -> SyncManagerProgress { - let mut progress = BlockHeadersProgress::default(); - progress.set_state(self.state); - SyncManagerProgress::BlockHeaders(progress) - } } - /// Given a manager whose tick() returns SyncError::Network after a few calls, + /// Given a manager whose tick() returns SyncError::FatalNetwork after a few calls, /// When the task loop processes the error, /// Then it exits promptly without requiring a shutdown signal. #[tokio::test] @@ -546,17 +569,20 @@ mod tests { let (req_tx, _req_rx) = mpsc::unbounded_channel::(); let requests = RequestSender::new(req_tx); let shutdown = CancellationToken::new(); - let (progress_sender, _progress_rx) = watch::channel(manager.progress()); + let (progress_sender, progress_rx) = watch::channel(manager.progress()); let context = SyncManagerTaskContext { message_receiver, - sync_event_sender, + sync_event_sender: sync_event_sender.clone(), network_event_receiver: network_event_sender.subscribe(), requests, shutdown: shutdown.clone(), progress_sender, }; + // Subscribe to sync events to verify ManagerExited is emitted + let mut event_rx = sync_event_sender.subscribe(); + // Spawn the task — it should exit on its own when tick returns Network error let handle = tokio::spawn(async move { manager.run(context).await }); @@ -569,8 +595,20 @@ mod tests { assert!(result.is_ok()); assert_eq!(result.unwrap(), ManagerIdentifier::BlockHeader); - // Verify tick was called at least error_after + 1 times - // (the error-producing call counts too) - assert!(tick_count.load(Ordering::Relaxed) > 3); + // Verify tick was called exactly 4 times: indices 0,1,2 succeed, index 3 returns error + assert_eq!(tick_count.load(Ordering::Relaxed), 4); + + // Verify progress state is Error after fatal network exit + assert_eq!(progress_rx.borrow().state(), SyncState::Error); + + // Verify ManagerExited event was emitted + let mut found_exited = false; + while let Ok(event) = event_rx.try_recv() { + if matches!(event, SyncEvent::ManagerExited { .. }) { + found_exited = true; + break; + } + } + assert!(found_exited, "ManagerExited event should have been emitted"); } } From 8541836fd03d3e6542b9b09cf81fd0b16237c56e Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:35:57 +0100 Subject: [PATCH 08/23] fix(spv): self-recover on FatalNetwork instead of exiting manager loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Managers now reset to WaitingForConnections and continue their loop on FatalNetwork errors, letting PeersUpdated events re-trigger sync. This eliminates the need for consumers to detect ManagerExited and restart the entire client. Managers only exit when the shutdown token fires. - Rename handle_fatal_exit → handle_fatal_network_error - Change all 4 FatalNetwork paths from break to continue - Remove SyncEvent::ManagerExited variant (no longer needed) - Update FatalNetwork doc to reflect reset behavior Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/client/events.rs | 7 --- dash-spv/src/error.rs | 15 +++--- dash-spv/src/sync/events.rs | 25 +-------- dash-spv/src/sync/sync_manager.rs | 89 ++++++++++++++++--------------- 4 files changed, 52 insertions(+), 84 deletions(-) diff --git a/dash-spv/src/client/events.rs b/dash-spv/src/client/events.rs index c6753f506..2c929fe47 100644 --- a/dash-spv/src/client/events.rs +++ b/dash-spv/src/client/events.rs @@ -26,13 +26,6 @@ impl DashSpvClient broadcast::Receiver { self.sync_coordinator.subscribe_events() } diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index 0eab0a001..564e452d5 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -218,16 +218,13 @@ pub enum SyncError { #[error("Network error: {0}")] Network(String), - /// Fatal network condition that causes the manager task loop to exit immediately. + /// Fatal network condition that resets the manager to WaitingForConnections. /// - /// Unlike [`SyncError::Network`], which is logged and allows the loop to continue, - /// this variant signals an unrecoverable failure (e.g., the request channel is closed) - /// and causes the manager to stop permanently. - /// - /// **Consumer responsibility:** When this error occurs, [`SyncEvent::ManagerExited`] - /// is emitted and [`SyncState::Error`] is set on the affected manager's progress. - /// The consumer must restart the SPV client to recover the affected sync phase. - #[error("Fatal network error (manager exiting): {0}")] + /// Unlike [`SyncError::Network`], which is logged and allows the current + /// operation to continue, this variant signals an unrecoverable failure + /// for the current connection (e.g., the request channel is closed). + /// The manager resets to `WaitingForConnections` and waits for new peers. + #[error("Fatal network error: {0}")] FatalNetwork(String), /// Validation errors for data received during sync (e.g., invalid headers, invalid proofs) diff --git a/dash-spv/src/sync/events.rs b/dash-spv/src/sync/events.rs index cea468f8e..aad953512 100644 --- a/dash-spv/src/sync/events.rs +++ b/dash-spv/src/sync/events.rs @@ -109,9 +109,7 @@ pub enum SyncEvent { height: u32, }, - /// A manager encountered a non-fatal error (logged, loop continues). - /// - /// For fatal errors that cause the manager to exit, see [`SyncEvent::ManagerExited`]. + /// A manager encountered an error during sync. /// /// Emitted by: Any manager /// Consumed by: Coordinator (for logging/monitoring) @@ -122,20 +120,6 @@ pub enum SyncEvent { error: String, }, - /// A manager task has exited due to a fatal error and will not restart. - /// - /// This is a **critical event** that consumers MUST handle. The affected sync - /// phase has stopped permanently. The client should be restarted to recover. - /// - /// Emitted by: Any manager (on `SyncError::FatalNetwork`) - /// Consumer action required: Restart the SPV client or take corrective action. - ManagerExited { - /// Which manager exited - manager: ManagerIdentifier, - /// Reason for exit - error: String, - }, - /// ChainLock received and processed. /// /// Emitted by: `ChainLockManager` @@ -234,13 +218,6 @@ impl SyncEvent { } => { format!("ManagerError({}, {})", manager, error) } - SyncEvent::ManagerExited { - manager, - error, - .. - } => { - format!("ManagerExited({}, {})", manager, error) - } SyncEvent::ChainLockReceived { chain_lock, validated, diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index e7c242a18..62d41f0ef 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -63,27 +63,22 @@ impl SyncManagerTaskContext { } } - /// Handle a fatal network error: log, update progress to Error state, and - /// emit both `ManagerError` and `ManagerExited` events. + /// Handle a fatal network error: log, update progress, and emit `ManagerError`. /// - /// The caller must call `set_state(SyncState::Error)` before this method - /// so that `progress` reflects the error state. - pub(super) fn handle_fatal_exit( + /// The caller must call `set_state(SyncState::WaitingForConnections)` before + /// this method so that `progress` reflects the reset state. + pub(super) fn handle_fatal_network_error( &self, manager: ManagerIdentifier, source: &str, progress: SyncManagerProgress, msg: &str, ) { - tracing::warn!("{} {} fatal network error, exiting: {}", manager, source, msg); + tracing::warn!("{} {} fatal network error, resetting to WaitingForConnections: {}", manager, source, msg); self.progress_sender.send(progress).ok(); self.emit_sync_event(SyncEvent::ManagerError { manager, - error: format!("Fatal network error (exiting): {}", msg), - }); - self.emit_sync_event(SyncEvent::ManagerExited { - manager, - error: format!("Fatal network error: {}", msg), + error: format!("Fatal network error ({}): {}", source, msg), }); } } @@ -260,9 +255,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.try_emit_progress(progress_before, &context.progress_sender); } Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::Error); - context.handle_fatal_exit(identifier, "message handler", self.progress(), msg); - break; + self.set_state(SyncState::WaitingForConnections); + context.handle_fatal_network_error(identifier, "message handler", self.progress(), msg); + continue; } Err(e) => { tracing::error!("{} error handling message: {}", identifier, e); @@ -291,9 +286,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.try_emit_progress(progress_before, &context.progress_sender); } Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::Error); - context.handle_fatal_exit(identifier, "sync event handler", self.progress(), msg); - break; + self.set_state(SyncState::WaitingForConnections); + context.handle_fatal_network_error(identifier, "sync event handler", self.progress(), msg); + continue; } Err(e) => { tracing::error!("{} error handling event: {}", identifier, e); @@ -323,9 +318,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.try_emit_progress(progress_before, &context.progress_sender); } Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::Error); - context.handle_fatal_exit(identifier, "network event handler", self.progress(), msg); - break; + self.set_state(SyncState::WaitingForConnections); + context.handle_fatal_network_error(identifier, "network event handler", self.progress(), msg); + continue; } Err(e) => { tracing::error!("{} error handling network event: {}", identifier, e); @@ -349,9 +344,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.try_emit_progress(progress_before, &context.progress_sender); } Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::Error); - context.handle_fatal_exit(identifier, "tick", self.progress(), msg); - break; + self.set_state(SyncState::WaitingForConnections); + context.handle_fatal_network_error(identifier, "tick", self.progress(), msg); + continue; } Err(e) => { tracing::error!("{} tick error: {}", identifier, e); @@ -550,9 +545,9 @@ mod tests { /// Given a manager whose tick() returns SyncError::FatalNetwork after a few calls, /// When the task loop processes the error, - /// Then it exits promptly without requiring a shutdown signal. + /// Then it resets to WaitingForConnections and keeps running. #[tokio::test] - async fn test_manager_exits_on_tick_network_error() { + async fn test_manager_resets_on_fatal_network_error() { let tick_count = Arc::new(AtomicU32::new(0)); let manager = NetworkErrorManager { @@ -580,35 +575,41 @@ mod tests { progress_sender, }; - // Subscribe to sync events to verify ManagerExited is emitted + // Subscribe to sync events to verify ManagerError is emitted let mut event_rx = sync_event_sender.subscribe(); - // Spawn the task — it should exit on its own when tick returns Network error + // Spawn the task — it should keep running after the FatalNetwork error let handle = tokio::spawn(async move { manager.run(context).await }); - // Wait for the task to complete with a timeout (should be fast) - let result = tokio::time::timeout(Duration::from_secs(2), handle) - .await - .expect("task should exit promptly on network error") - .unwrap(); - - assert!(result.is_ok()); - assert_eq!(result.unwrap(), ManagerIdentifier::BlockHeader); + // Wait long enough for the error to fire and several more ticks to occur + tokio::time::sleep(Duration::from_millis(500)).await; - // Verify tick was called exactly 4 times: indices 0,1,2 succeed, index 3 returns error - assert_eq!(tick_count.load(Ordering::Relaxed), 4); + // Verify progress state is WaitingForConnections (not Error) + assert_eq!(progress_rx.borrow().state(), SyncState::WaitingForConnections); - // Verify progress state is Error after fatal network exit - assert_eq!(progress_rx.borrow().state(), SyncState::Error); + // Verify tick was called more than 4 times (manager kept running after the error) + assert!(tick_count.load(Ordering::Relaxed) > 4, + "manager should keep ticking after FatalNetwork error"); - // Verify ManagerExited event was emitted - let mut found_exited = false; + // Verify ManagerError event was emitted + let mut found_error = false; while let Ok(event) = event_rx.try_recv() { - if matches!(event, SyncEvent::ManagerExited { .. }) { - found_exited = true; + if matches!(event, SyncEvent::ManagerError { .. }) { + found_error = true; break; } } - assert!(found_exited, "ManagerExited event should have been emitted"); + assert!(found_error, "ManagerError event should have been emitted"); + + // Shut down the manager via the shutdown token + shutdown.cancel(); + + let result = tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("task should exit after shutdown signal") + .unwrap(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ManagerIdentifier::BlockHeader); } } From cb84ccc8a8f01cdea11f9f0120358b133ebabf6a Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:40:16 +0100 Subject: [PATCH 09/23] fix(spv): remove take() in PeerNetworkManager::shutdown to close race The JoinSet was moved out of the mutex via std::mem::take before draining. This created a race where in-flight tasks that had not yet observed the cancellation token could spawn new tasks into the original (now empty) JoinSet, leaving them orphaned. Since connect_to_peer() already uses select! with the cancellation token when acquiring the tasks lock, no deadlock can occur. Drain the JoinSet while holding the lock instead. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/network/manager.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 6c6db9088..2a10d026e 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -1249,13 +1249,10 @@ impl PeerNetworkManager { log::warn!("Failed to save reputation data on shutdown: {}", e); } - // Take tasks out of the mutex so we don't hold the lock while draining. - // This prevents a deadlock where a task (e.g. maintenance loop) tries to - // acquire self.tasks via connect_to_peer() while we hold the lock here. - let mut tasks = { - let mut guard = self.tasks.lock().await; - std::mem::take(&mut *guard) - }; + // Drain tasks while holding the lock. connect_to_peer() already uses + // `select!` with the cancellation token when acquiring this lock, so no + // deadlock can occur once the shutdown token is cancelled above. + let mut tasks = self.tasks.lock().await; while let Some(result) = tasks.join_next().await { if let Err(e) = result { tracing::error!("Task join error: {}", e); From f4a52dd3235cd7f74a6350f58047793513f1f617 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:44:32 +0100 Subject: [PATCH 10/23] fix(spv): add missing shutdown checks in maintenance loop Add shutdown_token check before connect_to_peer in the DNS discovery peer loop, and add an early-exit check before the ping/save section so the maintenance loop exits promptly during shutdown. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/network/manager.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 2a10d026e..36542be39 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -830,6 +830,8 @@ impl PeerNetworkManager { } } + if this.shutdown_token.is_cancelled() { break; } + // Send ping to all peers if needed for (addr, peer) in this.pool.get_all_peers().await { let mut peer_guard = peer.write().await; @@ -882,6 +884,7 @@ impl PeerNetworkManager { let mut dns_attempted = 0; for addr in dns_peers.iter() { if !this.pool.is_connected(addr).await && !this.pool.is_connecting(addr).await { + if this.shutdown_token.is_cancelled() { break; } this.connect_to_peer(*addr).await; dns_attempted += 1; if dns_attempted >= needed { From 7356e2b62096a8a47e05119bee92f703218c1e4d Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 17:18:04 +0100 Subject: [PATCH 11/23] fix(spv): address PR #440 audit findings Remove FatalNetwork variant (never produced in production) and match on Network instead. Add 2-second cooldown after network-error recovery to prevent log/event flooding from the 100ms tick loop. Replace the error-prone handle_fatal_network_error (which required callers to call set_state first) with a recover_from_network_error default trait method that enforces correct ordering. Migrate all log:: calls to tracing:: in manager.rs to match project convention. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/error.rs | 17 +--- dash-spv/src/network/manager.rs | 160 +++++++++++++++--------------- dash-spv/src/sync/sync_manager.rs | 97 +++++++++++------- 3 files changed, 143 insertions(+), 131 deletions(-) diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index 564e452d5..8622f7ddb 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -212,21 +212,12 @@ pub enum SyncError { /// Network-related errors (e.g., connection failures, protocol errors). /// - /// For transient network issues, use this variant or [`SyncError::Timeout`]. - /// For fatal, unrecoverable conditions (e.g., request channel closed), - /// use [`SyncError::FatalNetwork`] instead. + /// When returned from a `SyncManager` method, the default `run()` loop + /// resets the manager to `WaitingForConnections` and applies a cooldown + /// to prevent log/event flooding. #[error("Network error: {0}")] Network(String), - /// Fatal network condition that resets the manager to WaitingForConnections. - /// - /// Unlike [`SyncError::Network`], which is logged and allows the current - /// operation to continue, this variant signals an unrecoverable failure - /// for the current connection (e.g., the request channel is closed). - /// The manager resets to `WaitingForConnections` and waits for new peers. - #[error("Fatal network error: {0}")] - FatalNetwork(String), - /// Validation errors for data received during sync (e.g., invalid headers, invalid proofs) /// Use this for data validation errors, not state errors #[error("Validation error: {0}")] @@ -253,7 +244,7 @@ impl SyncError { SyncError::Timeout(_) => "timeout", SyncError::Validation(_) => "validation", SyncError::MissingDependency(_) => "dependency", - SyncError::Network(_) | SyncError::FatalNetwork(_) => "network", + SyncError::Network(_) => "network", SyncError::Storage(_) => "storage", SyncError::Headers2DecompressionFailed(_) => "headers2", SyncError::MasternodeSyncFailed(_) => "masternode", diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 36542be39..a424e3693 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -94,7 +94,7 @@ impl PeerNetworkManager { let reputation_manager = Arc::new(PeerReputationManager::new()); if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { - log::warn!("Failed to load peer reputation data: {}", e); + tracing::warn!("Failed to load peer reputation data: {}", e); } // Determine exclusive mode: either explicitly requested or peers were provided @@ -148,7 +148,7 @@ impl PeerNetworkManager { /// Start the network manager pub async fn start(&self) -> Result<(), Error> { - log::info!("Starting peer network manager for {:?}", self.network); + tracing::info!("Starting peer network manager for {:?}", self.network); let mut peer_addresses: Vec = self .initial_peers @@ -157,7 +157,7 @@ impl PeerNetworkManager { .collect(); if self.exclusive_mode { - log::info!( + tracing::info!( "Exclusive peer mode: connecting ONLY to {} specified peer(s)", self.initial_peers.len() ); @@ -171,7 +171,7 @@ impl PeerNetworkManager { // If we still have no peers, immediately discover via DNS if peer_addresses.is_empty() { - log::info!( + tracing::info!( "No peers configured, performing immediate DNS discovery for {:?}", self.network ); @@ -183,13 +183,13 @@ impl PeerNetworkManager { .take(TARGET_PEERS) .map(|addr| AddrV2Message::new(addr, ServiceFlags::NETWORK)), ); - log::info!( + tracing::info!( "DNS discovery found {} peers, using {} for startup", dns_peers_found, peer_addresses.len() ); } else { - log::info!( + tracing::info!( "Starting with {} peers from disk (DNS discovery will be used later if needed)", peer_addresses.len() ); @@ -211,7 +211,7 @@ impl PeerNetworkManager { async fn connect_to_peer(&self, addr: SocketAddr) { // Check reputation first if !self.reputation_manager.should_connect_to_peer(&addr).await { - log::warn!("Not connecting to {} due to bad reputation", addr); + tracing::warn!("Not connecting to {} due to bad reputation", addr); return; } @@ -317,7 +317,7 @@ impl PeerNetworkManager { .await; } Err(e) => { - log::warn!("Handshake failed with {}: {}", addr, e); + tracing::warn!("Handshake failed with {}: {}", addr, e); pool.remove_peer(&addr).await; // Update reputation for handshake failure reputation_manager @@ -333,7 +333,7 @@ impl PeerNetworkManager { } } Err(e) => { - log::debug!("Failed to connect to {}: {}", addr, e); + tracing::debug!("Failed to connect to {}: {}", addr, e); pool.remove_peer(&addr).await; // Minor reputation penalty for connection failure reputation_manager @@ -362,7 +362,7 @@ impl PeerNetworkManager { network_event_sender: broadcast::Sender, ) { tokio::spawn(async move { - log::debug!("Starting peer reader loop for {}", addr); + tracing::debug!("Starting peer reader loop for {}", addr); let mut loop_iteration = 0; let mut headers2_state = CompressionState::default(); @@ -371,7 +371,7 @@ impl PeerNetworkManager { // Check shutdown signal first with detailed logging if shutdown_token.is_cancelled() { - log::info!("Breaking peer reader loop for {} - shutdown signal received (iteration {})", addr, loop_iteration); + tracing::info!("Breaking peer reader loop for {} - shutdown signal received (iteration {})", addr, loop_iteration); break; } @@ -379,7 +379,7 @@ impl PeerNetworkManager { let peer = match pool.get_peer(&addr).await { Some(peer) => peer, None => { - log::warn!("Breaking peer reader loop for {} - peer no longer in pool (iteration {})", addr, loop_iteration); + tracing::warn!("Breaking peer reader loop for {} - peer no longer in pool (iteration {})", addr, loop_iteration); break; } }; @@ -389,7 +389,7 @@ impl PeerNetworkManager { // Try to get a read lock first to check if peer is available let peer_guard = peer.read().await; if !peer_guard.is_connected() { - log::warn!("Breaking peer reader loop for {} - peer no longer connected (iteration {})", addr, loop_iteration); + tracing::warn!("Breaking peer reader loop for {} - peer no longer connected (iteration {})", addr, loop_iteration); drop(peer_guard); break; } @@ -405,7 +405,7 @@ impl PeerNetworkManager { Ok(None) }, _ = shutdown_token.cancelled() => { - log::info!("Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})", addr, loop_iteration); + tracing::info!("Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})", addr, loop_iteration); break; } } @@ -414,7 +414,7 @@ impl PeerNetworkManager { match msg_result { Ok(Some(msg)) => { // Log all received messages at debug level to help troubleshoot - log::debug!("Received {:?} from {}", msg.cmd(), addr); + tracing::debug!("Received {:?} from {}", msg.cmd(), addr); // Handle some messages directly match &msg.inner() { @@ -424,7 +424,7 @@ impl PeerNetworkManager { } NetworkMessage::SendHeaders2 => { // Peer is indicating they will send us compressed headers - log::info!( + tracing::info!( "Peer {} sent SendHeaders2 - they will send compressed headers", addr ); @@ -438,7 +438,7 @@ impl PeerNetworkManager { continue; // Don't forward to client } NetworkMessage::GetAddr => { - log::trace!( + tracing::trace!( "Received GetAddr from {}, sending known addresses", addr ); @@ -446,7 +446,7 @@ impl PeerNetworkManager { let response = addrv2_handler.build_addr_response().await; let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.send_message(response).await { - log::error!("Failed to send addr response to {}: {}", addr, e); + tracing::error!("Failed to send addr response to {}: {}", addr, e); } continue; // Don't forward GetAddr to client } @@ -454,10 +454,10 @@ impl PeerNetworkManager { // Handle ping directly let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.handle_ping(*nonce).await { - log::error!("Failed to handle ping from {}: {}", addr, e); + tracing::error!("Failed to handle ping from {}: {}", addr, e); // If we can't send pong, connection is likely broken if matches!(e, NetworkError::ConnectionFailed(_)) { - log::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration); + tracing::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration); break; } } @@ -467,13 +467,13 @@ impl PeerNetworkManager { // Handle pong directly let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.handle_pong(*nonce) { - log::error!("Failed to handle pong from {}: {}", addr, e); + tracing::error!("Failed to handle pong from {}: {}", addr, e); } continue; // Don't forward pong to client } NetworkMessage::Version(_) | NetworkMessage::Verack => { // These are handled during handshake, ignore here - log::trace!( + tracing::trace!( "Ignoring handshake message {:?} from {}", msg.cmd(), addr @@ -499,7 +499,7 @@ impl PeerNetworkManager { }) .collect(); if !converted.is_empty() { - log::debug!( + tracing::debug!( "Converted {} legacy addr entries from {}", converted.len(), addr @@ -510,7 +510,7 @@ impl PeerNetworkManager { } NetworkMessage::Headers(headers) => { // Log headers messages specifically - log::info!( + tracing::info!( "📨 Received Headers message from {} with {} headers! (regular uncompressed)", addr, headers.len() @@ -518,14 +518,14 @@ impl PeerNetworkManager { // Check if peer supports headers2 let peer_guard = peer.read().await; if peer_guard.supports_headers2() { - log::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr); + tracing::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr); } drop(peer_guard); // Forward to client } NetworkMessage::Headers2(headers2) => { // Decompress headers in network layer and forward as regular Headers - log::info!( + tracing::info!( "Received Headers2 from {} with {} compressed headers - decompressing", addr, headers2.headers.len() @@ -533,7 +533,7 @@ impl PeerNetworkManager { match headers2_state.process_headers(&headers2.headers) { Ok(headers) => { - log::info!( + tracing::info!( "Decompressed {} headers from {} - forwarding as regular Headers", headers.len(), addr @@ -545,7 +545,7 @@ impl PeerNetworkManager { continue; // Already sent, don't forward the original Headers2 } Err(e) => { - log::error!( + tracing::error!( "Headers2 decompression failed from {}: {} - disabling headers2", addr, e @@ -565,7 +565,7 @@ impl PeerNetworkManager { } NetworkMessage::GetHeaders(_) => { // SPV clients don't serve headers to peers - log::debug!( + tracing::debug!( "Received GetHeaders from {} - ignoring (SPV client)", addr ); @@ -573,7 +573,7 @@ impl PeerNetworkManager { } NetworkMessage::GetHeaders2(_) => { // SPV clients don't serve compressed headers to peers - log::debug!( + tracing::debug!( "Received GetHeaders2 from {} - ignoring (SPV client)", addr ); @@ -584,13 +584,13 @@ impl PeerNetworkManager { payload, } => { // Log unknown messages with more detail - log::warn!("Received unknown message from {}: command='{}', payload_len={}", + tracing::warn!("Received unknown message from {}: command='{}', payload_len={}", addr, command, payload.len()); // Still forward to client } _ => { // Forward other messages to client - log::trace!("Forwarding {:?} from {} to client", msg.cmd(), addr); + tracing::trace!("Forwarding {:?} from {} to client", msg.cmd(), addr); } } @@ -604,11 +604,11 @@ impl PeerNetworkManager { Err(e) => { match e { NetworkError::PeerDisconnected => { - log::info!("Peer {} disconnected", addr); + tracing::info!("Peer {} disconnected", addr); break; } NetworkError::Timeout => { - log::debug!("Timeout reading from {}, continuing...", addr); + tracing::debug!("Timeout reading from {}, continuing...", addr); // Minor reputation penalty for timeout reputation_manager .update_reputation( @@ -620,14 +620,14 @@ impl PeerNetworkManager { continue; } _ => { - log::error!("Fatal error reading from {}: {}", addr, e); + tracing::error!("Fatal error reading from {}: {}", addr, e); // Check if this is a serialization error that might have context if let NetworkError::Serialization(ref decode_error) = e { let error_msg = decode_error.to_string(); if error_msg.contains("unknown special transaction type") { - log::warn!("Peer {} sent block with unsupported transaction type: {}", addr, decode_error); - log::error!( + tracing::warn!("Peer {} sent block with unsupported transaction type: {}", addr, decode_error); + tracing::error!( "BLOCK DECODE FAILURE - Error details: {}", error_msg ); @@ -643,7 +643,7 @@ impl PeerNetworkManager { .contains("Failed to decode transactions for block") { // The error now includes the block hash - log::error!("Peer {} sent block that failed transaction decoding: {}", addr, decode_error); + tracing::error!("Peer {} sent block that failed transaction decoding: {}", addr, decode_error); // Try to extract the block hash from the error message if let Some(hash_start) = error_msg.find("block ") { if let Some(hash_end) = @@ -651,19 +651,19 @@ impl PeerNetworkManager { { let block_hash = &error_msg [hash_start + 6..hash_start + 6 + hash_end]; - log::error!("FAILING BLOCK HASH: {}", block_hash); + tracing::error!("FAILING BLOCK HASH: {}", block_hash); } } } else if error_msg.contains("IO error") { // This might be our wrapped error - log it prominently - log::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", addr); - log::error!( + tracing::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", addr); + tracing::error!( "Serialization error from {}: {}", addr, decode_error ); } else { - log::error!( + tracing::error!( "Serialization error from {}: {}", addr, decode_error @@ -681,7 +681,7 @@ impl PeerNetworkManager { } // Remove from pool - log::warn!("Disconnecting from {} (peer reader loop ended)", addr); + tracing::warn!("Disconnecting from {} (peer reader loop ended)", addr); let removed = pool.remove_peer(&addr).await; if removed.is_some() { // Decrement connected peer counter when a peer is removed @@ -723,7 +723,7 @@ impl PeerNetworkManager { }; let Some(mut request_rx) = request_rx else { - log::warn!("Request processor already started or receiver unavailable"); + tracing::warn!("Request processor already started or receiver unavailable"); return; }; @@ -732,13 +732,13 @@ impl PeerNetworkManager { let mut tasks = self.tasks.lock().await; tasks.spawn(async move { - log::info!("Starting request processor task"); + tracing::info!("Starting request processor task"); loop { tokio::select! { request = request_rx.recv() => { match request { Some(NetworkRequest::SendMessage(msg)) => { - log::debug!("Request processor: sending {}", msg.cmd()); + tracing::debug!("Request processor: sending {}", msg.cmd()); // Spawn each send concurrently to allow parallel requests across peers. let this = this.clone(); tokio::spawn(async move { @@ -758,18 +758,18 @@ impl PeerNetworkManager { } }; if let Err(e) = result { - log::error!("Request processor: failed to send message: {}", e); + tracing::error!("Request processor: failed to send message: {}", e); } }); } None => { - log::info!("Request processor: channel closed"); + tracing::info!("Request processor: channel closed"); break; } } } _ = shutdown_token.cancelled() => { - log::info!("Request processor: shutting down"); + tracing::info!("Request processor: shutting down"); break; } } @@ -790,7 +790,7 @@ impl PeerNetworkManager { this.pool.cleanup_disconnected().await; let count = this.pool.peer_count().await; - log::debug!("Connected peers: {}", count); + tracing::debug!("Connected peers: {}", count); // Keep the cached counter in sync with actual pool count this.connected_peer_count.store(count, Ordering::Relaxed); if this.exclusive_mode { @@ -800,7 +800,7 @@ impl PeerNetworkManager { && !this.pool.is_connecting(addr).await { if this.shutdown_token.is_cancelled() { break; } - log::info!("Reconnecting to exclusive peer: {}", addr); + tracing::info!("Reconnecting to exclusive peer: {}", addr); this.connect_to_peer(*addr).await; } } @@ -837,7 +837,7 @@ impl PeerNetworkManager { let mut peer_guard = peer.write().await; if peer_guard.should_ping() { if let Err(e) = peer_guard.send_ping().await { - log::error!("Failed to ping {}: {}", addr, e); + tracing::error!("Failed to ping {}: {}", addr, e); // Update reputation for ping failure this.reputation_manager .update_reputation(addr, misbehavior_scores::TIMEOUT, "Ping failed") @@ -853,20 +853,20 @@ impl PeerNetworkManager { this.addrv2_handler.get_known_addresses().await; if !addresses.is_empty() { if let Err(e) = this.peer_store.save_peers(&addresses).await { - log::warn!("Failed to save peers: {}", e); + tracing::warn!("Failed to save peers: {}", e); } } // Save reputation data periodically if let Err(e) = this.reputation_manager.save_to_storage(&*this.peer_store).await { - log::warn!("Failed to save reputation data: {}", e); + tracing::warn!("Failed to save reputation data: {}", e); } } tokio::select! { _ = time::sleep(MAINTENANCE_INTERVAL) => { - log::debug!("Maintenance interval elapsed"); + tracing::debug!("Maintenance interval elapsed"); } _ = dns_interval.tick(), if !this.exclusive_mode => { let count = this.pool.peer_count().await; @@ -876,7 +876,7 @@ impl PeerNetworkManager { let dns_peers = tokio::select! { peers = this.discovery.discover_peers(this.network) => peers, _ = this.shutdown_token.cancelled() => { - log::info!("Maintenance loop shutting down during DNS discovery"); + tracing::info!("Maintenance loop shutting down during DNS discovery"); break; } }; @@ -894,7 +894,7 @@ impl PeerNetworkManager { } } _ = this.shutdown_token.cancelled() => { - log::info!("Maintenance loop shutting down"); + tracing::info!("Maintenance loop shutting down"); break; } } @@ -930,11 +930,11 @@ impl PeerNetworkManager { match filter_peer { Some(addr) => { - log::debug!("Selected peer {} for compact filter request", addr); + tracing::debug!("Selected peer {} for compact filter request", addr); addr } None => { - log::warn!("No peers support compact filters, cannot send {}", message.cmd()); + tracing::warn!("No peers support compact filters, cannot send {}", message.cmd()); return Err(NetworkError::ProtocolError( "No peers support compact filters".to_string(), )); @@ -966,7 +966,7 @@ impl PeerNetworkManager { let chosen = selected.unwrap_or(peers[0].0); if Some(chosen) != *current_sync_peer { - log::info!("Sync peer selected for Headers2: {}", chosen); + tracing::info!("Sync peer selected for Headers2: {}", chosen); *current_sync_peer = Some(chosen); } drop(current_sync_peer); @@ -982,7 +982,7 @@ impl PeerNetworkManager { } else { // Current sync peer disconnected, pick a new one let new_addr = peers[0].0; - log::info!( + tracing::info!( "Sync peer switched from {} to {} (previous peer disconnected)", current_addr, new_addr @@ -993,7 +993,7 @@ impl PeerNetworkManager { } else { // No current sync peer, pick the first available let new_addr = peers[0].0; - log::info!("Sync peer selected: {}", new_addr); + tracing::info!("Sync peer selected: {}", new_addr); *current_sync_peer = Some(new_addr); new_addr }; @@ -1017,7 +1017,7 @@ impl PeerNetworkManager { if !self.headers2_disabled.lock().await.contains(addr) && peer_supports_headers2 => { - log::debug!( + tracing::debug!( "Upgrading GetHeaders to GetHeaders2 for peer {}: {:?}", addr, get_headers @@ -1031,10 +1031,10 @@ impl PeerNetworkManager { NetworkMessage::GetHeaders(_) | NetworkMessage::GetCFilters(_) | NetworkMessage::GetCFHeaders(_) => { - log::debug!("Sending {} to {}", message.cmd(), addr); + tracing::debug!("Sending {} to {}", message.cmd(), addr); } NetworkMessage::GetHeaders2(gh2) => { - log::info!("📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}", + tracing::info!("📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}", addr, gh2.version, gh2.locator_hashes.len(), @@ -1043,10 +1043,10 @@ impl PeerNetworkManager { ); } NetworkMessage::SendHeaders2 => { - log::info!("🤝 Sending SendHeaders2 to {} - requesting compressed headers", addr); + tracing::info!("🤝 Sending SendHeaders2 to {} - requesting compressed headers", addr); } _ => { - log::trace!("Sending {:?} to {}", message.cmd(), addr); + tracing::trace!("Sending {:?} to {}", message.cmd(), addr); } } @@ -1132,7 +1132,7 @@ impl PeerNetworkManager { peer_guard.can_request_headers2() }; if peer_supports_headers2 && !self.headers2_disabled.lock().await.contains(addr) { - log::debug!("Upgrading GetHeaders to GetHeaders2 for peer {}", addr); + tracing::debug!("Upgrading GetHeaders to GetHeaders2 for peer {}", addr); NetworkMessage::GetHeaders2(get_headers) } else { NetworkMessage::GetHeaders(get_headers) @@ -1141,7 +1141,7 @@ impl PeerNetworkManager { other => other, }; - log::debug!( + tracing::debug!( "Distributing {} request to peer {} (round-robin idx {})", message.cmd(), addr, @@ -1165,10 +1165,10 @@ impl PeerNetworkManager { // Reduce verbosity for common sync messages match &message { NetworkMessage::GetHeaders(_) | NetworkMessage::GetCFilters(_) => { - log::debug!("Broadcasting {} to {}", message.cmd(), addr); + tracing::debug!("Broadcasting {} to {}", message.cmd(), addr); } _ => { - log::trace!("Broadcasting {:?} to {}", message.cmd(), addr); + tracing::trace!("Broadcasting {:?} to {}", message.cmd(), addr); } } let msg = message.clone(); @@ -1196,7 +1196,7 @@ impl PeerNetworkManager { /// Disconnect a specific peer pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Disconnecting peer {} - reason: {}", addr, reason); + tracing::info!("Disconnecting peer {} - reason: {}", addr, reason); // Remove the peer self.pool.remove_peer(addr).await; @@ -1212,7 +1212,7 @@ impl PeerNetworkManager { /// Ban a specific peer manually pub async fn ban_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Manually banning peer {} - reason: {}", addr, reason); + tracing::info!("Manually banning peer {} - reason: {}", addr, reason); // Disconnect the peer first self.disconnect_peer(addr, reason).await?; @@ -1236,20 +1236,20 @@ impl PeerNetworkManager { /// Shutdown the network manager pub async fn shutdown(&self) { - log::info!("Shutting down peer network manager"); + tracing::info!("Shutting down peer network manager"); self.shutdown_token.cancel(); // Save known peers before shutdown let addresses = self.addrv2_handler.get_addresses_for_peer(MAX_ADDR_TO_STORE).await; if !addresses.is_empty() { if let Err(e) = self.peer_store.save_peers(&addresses).await { - log::warn!("Failed to save peers on shutdown: {}", e); + tracing::warn!("Failed to save peers on shutdown: {}", e); } } // Save reputation data before shutdown if let Err(e) = self.reputation_manager.save_to_storage(&*self.peer_store).await { - log::warn!("Failed to save reputation data on shutdown: {}", e); + tracing::warn!("Failed to save reputation data on shutdown: {}", e); } // Drain tasks while holding the lock. connect_to_peer() already uses @@ -1359,14 +1359,14 @@ impl NetworkManager for PeerNetworkManager { async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr, reason: &str) { match self.disconnect_peer(&address, reason).await { Ok(()) => { - log::warn!( + tracing::warn!( "Peer {} disconnected for invalid ChainLock enforcement: {}", address, reason ); } Err(err) => { - log::error!( + tracing::error!( "Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}", address, reason, @@ -1399,14 +1399,14 @@ impl NetworkManager for PeerNetworkManager { match self.disconnect_peer(&address, reason).await { Ok(()) => { - log::warn!( + tracing::warn!( "Peer {} disconnected for invalid InstantLock enforcement: {}", address, reason ); } Err(err) => { - log::error!( + tracing::error!( "Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}", address, reason, diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 62d41f0ef..dea85dad8 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -63,24 +63,6 @@ impl SyncManagerTaskContext { } } - /// Handle a fatal network error: log, update progress, and emit `ManagerError`. - /// - /// The caller must call `set_state(SyncState::WaitingForConnections)` before - /// this method so that `progress` reflects the reset state. - pub(super) fn handle_fatal_network_error( - &self, - manager: ManagerIdentifier, - source: &str, - progress: SyncManagerProgress, - msg: &str, - ) { - tracing::warn!("{} {} fatal network error, resetting to WaitingForConnections: {}", manager, source, msg); - self.progress_sender.send(progress).ok(); - self.emit_sync_event(SyncEvent::ManagerError { - manager, - error: format!("Fatal network error ({}): {}", source, msg), - }); - } } #[async_trait] @@ -213,6 +195,28 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } } + /// Reset to `WaitingForConnections`, report progress, and emit a `ManagerError` event. + /// + /// This bundles the state reset and error reporting into one call so that + /// `set_state` is always invoked before the progress snapshot — eliminating + /// the ordering bug where callers could forget to reset state first. + fn recover_from_network_error( + &mut self, + context: &SyncManagerTaskContext, + source: &str, + msg: &str, + ) { + self.set_state(SyncState::WaitingForConnections); + let progress = self.progress(); + let identifier = self.identifier(); + tracing::warn!("{} {} network error, resetting to WaitingForConnections: {}", identifier, source, msg); + context.progress_sender.send(progress).ok(); + context.emit_sync_event(SyncEvent::ManagerError { + manager: identifier, + error: format!("Network error ({}): {}", source, msg), + }); + } + /// Run the manager task, processing messages, events, and periodic ticks. /// /// This consumes the manager and runs until shutdown is signaled. @@ -232,6 +236,11 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { // Tick interval for periodic housekeeping let mut tick_interval = interval(Duration::from_millis(100)); + // Cooldown after a network-error recovery to avoid log/event flooding. + // While active, the periodic tick branch is skipped (message and event + // branches are externally driven and don't need throttling). + let mut network_error_cooldown: Option = None; + tracing::info!("{} task entering main loop", identifier); loop { @@ -254,9 +263,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::WaitingForConnections); - context.handle_fatal_network_error(identifier, "message handler", self.progress(), msg); + Err(SyncError::Network(ref msg)) => { + self.recover_from_network_error(&context, "message handler", msg); + network_error_cooldown = Some(tokio::time::Instant::now()); continue; } Err(e) => { @@ -285,9 +294,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::WaitingForConnections); - context.handle_fatal_network_error(identifier, "sync event handler", self.progress(), msg); + Err(SyncError::Network(ref msg)) => { + self.recover_from_network_error(&context, "sync event handler", msg); + network_error_cooldown = Some(tokio::time::Instant::now()); continue; } Err(e) => { @@ -317,9 +326,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::WaitingForConnections); - context.handle_fatal_network_error(identifier, "network event handler", self.progress(), msg); + Err(SyncError::Network(ref msg)) => { + self.recover_from_network_error(&context, "network event handler", msg); + network_error_cooldown = Some(tokio::time::Instant::now()); continue; } Err(e) => { @@ -335,6 +344,16 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } // Periodic tick for timeouts and housekeeping _ = tick_interval.tick() => { + // Skip tick processing while inside the network-error cooldown + // window. Message and event branches are externally driven and + // don't need this guard. + if let Some(since) = network_error_cooldown { + if since.elapsed() < Duration::from_secs(2) { + continue; + } + network_error_cooldown = None; + } + let progress_before = self.progress(); match self.tick(&context.requests).await { Ok(events) => { @@ -343,9 +362,9 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::FatalNetwork(ref msg)) => { - self.set_state(SyncState::WaitingForConnections); - context.handle_fatal_network_error(identifier, "tick", self.progress(), msg); + Err(SyncError::Network(ref msg)) => { + self.recover_from_network_error(&context, "tick", msg); + network_error_cooldown = Some(tokio::time::Instant::now()); continue; } Err(e) => { @@ -483,7 +502,7 @@ mod tests { assert!(tick_count.load(Ordering::Relaxed) > 0); } - /// Mock manager whose tick() returns SyncError::FatalNetwork after a threshold. + /// Mock manager whose tick() returns SyncError::Network after a threshold. struct NetworkErrorManager { identifier: ManagerIdentifier, state: SyncState, @@ -536,14 +555,14 @@ mod tests { async fn tick(&mut self, _requests: &RequestSender) -> SyncResult> { let count = self.tick_count.fetch_add(1, Ordering::Relaxed); if count >= self.error_after { - Err(SyncError::FatalNetwork("channel closed".into())) + Err(SyncError::Network("channel closed".into())) } else { Ok(vec![]) } } } - /// Given a manager whose tick() returns SyncError::FatalNetwork after a few calls, + /// Given a manager whose tick() returns SyncError::Network after a few calls, /// When the task loop processes the error, /// Then it resets to WaitingForConnections and keeps running. #[tokio::test] @@ -578,18 +597,20 @@ mod tests { // Subscribe to sync events to verify ManagerError is emitted let mut event_rx = sync_event_sender.subscribe(); - // Spawn the task — it should keep running after the FatalNetwork error + // Spawn the task — it should keep running after the Network error let handle = tokio::spawn(async move { manager.run(context).await }); - // Wait long enough for the error to fire and several more ticks to occur - tokio::time::sleep(Duration::from_millis(500)).await; + // Wait long enough for the error to fire (tick 3 at ~300ms) plus the + // 2-second cooldown, plus a few more ticks after cooldown expires. + tokio::time::sleep(Duration::from_millis(2800)).await; // Verify progress state is WaitingForConnections (not Error) assert_eq!(progress_rx.borrow().state(), SyncState::WaitingForConnections); - // Verify tick was called more than 4 times (manager kept running after the error) + // Verify tick was called more than the error threshold (manager kept + // running after the Network error and cooldown expired). assert!(tick_count.load(Ordering::Relaxed) > 4, - "manager should keep ticking after FatalNetwork error"); + "manager should keep ticking after Network error and cooldown"); // Verify ManagerError event was emitted let mut found_error = false; From f48a3a5095862a77b949780cba0d4ffcc1fce314 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 16 Feb 2026 17:25:56 +0100 Subject: [PATCH 12/23] fmt --- dash-spv/src/network/manager.rs | 27 ++++++++++++++++++++++----- dash-spv/src/sync/sync_manager.rs | 14 ++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index a424e3693..87f1605c7 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -446,7 +446,11 @@ impl PeerNetworkManager { let response = addrv2_handler.build_addr_response().await; let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.send_message(response).await { - tracing::error!("Failed to send addr response to {}: {}", addr, e); + tracing::error!( + "Failed to send addr response to {}: {}", + addr, + e + ); } continue; // Don't forward GetAddr to client } @@ -590,7 +594,11 @@ impl PeerNetworkManager { } _ => { // Forward other messages to client - tracing::trace!("Forwarding {:?} from {} to client", msg.cmd(), addr); + tracing::trace!( + "Forwarding {:?} from {} to client", + msg.cmd(), + addr + ); } } @@ -651,7 +659,10 @@ impl PeerNetworkManager { { let block_hash = &error_msg [hash_start + 6..hash_start + 6 + hash_end]; - tracing::error!("FAILING BLOCK HASH: {}", block_hash); + tracing::error!( + "FAILING BLOCK HASH: {}", + block_hash + ); } } } else if error_msg.contains("IO error") { @@ -934,7 +945,10 @@ impl PeerNetworkManager { addr } None => { - tracing::warn!("No peers support compact filters, cannot send {}", message.cmd()); + tracing::warn!( + "No peers support compact filters, cannot send {}", + message.cmd() + ); return Err(NetworkError::ProtocolError( "No peers support compact filters".to_string(), )); @@ -1043,7 +1057,10 @@ impl PeerNetworkManager { ); } NetworkMessage::SendHeaders2 => { - tracing::info!("🤝 Sending SendHeaders2 to {} - requesting compressed headers", addr); + tracing::info!( + "🤝 Sending SendHeaders2 to {} - requesting compressed headers", + addr + ); } _ => { tracing::trace!("Sending {:?} to {}", message.cmd(), addr); diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index dea85dad8..6b40df52c 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -62,7 +62,6 @@ impl SyncManagerTaskContext { self.emit_sync_event(event); } } - } #[async_trait] @@ -209,7 +208,12 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.set_state(SyncState::WaitingForConnections); let progress = self.progress(); let identifier = self.identifier(); - tracing::warn!("{} {} network error, resetting to WaitingForConnections: {}", identifier, source, msg); + tracing::warn!( + "{} {} network error, resetting to WaitingForConnections: {}", + identifier, + source, + msg + ); context.progress_sender.send(progress).ok(); context.emit_sync_event(SyncEvent::ManagerError { manager: identifier, @@ -609,8 +613,10 @@ mod tests { // Verify tick was called more than the error threshold (manager kept // running after the Network error and cooldown expired). - assert!(tick_count.load(Ordering::Relaxed) > 4, - "manager should keep ticking after Network error and cooldown"); + assert!( + tick_count.load(Ordering::Relaxed) > 4, + "manager should keep ticking after Network error and cooldown" + ); // Verify ManagerError event was emitted let mut found_error = false; From 8e22175231bdfbf61a73eb56a3a7a0cda0a97078 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 08:40:44 +0100 Subject: [PATCH 13/23] refactor(spv): extract network error cooldown into named constant Replace inline `Duration::from_secs(2)` with `NETWORK_ERROR_COOLDOWN` constant for clarity. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/sync/sync_manager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 6b40df52c..6650ac4a7 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -13,6 +13,10 @@ use async_trait::async_trait; /// and communicates with other managers via events. Managers progress independently and /// catch up to each other as events flow between them. use std::time::Duration; + +/// Cooldown period after a network-error recovery before tick processing resumes. +/// Prevents log/event flooding when the network is persistently down. +const NETWORK_ERROR_COOLDOWN: Duration = Duration::from_secs(2); use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::watch; @@ -352,7 +356,7 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { // window. Message and event branches are externally driven and // don't need this guard. if let Some(since) = network_error_cooldown { - if since.elapsed() < Duration::from_secs(2) { + if since.elapsed() < NETWORK_ERROR_COOLDOWN { continue; } network_error_cooldown = None; From 887206289a58f6abd692b087839a3f4144bc659c Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 08:50:45 +0100 Subject: [PATCH 14/23] fmt --- dash-spv/src/network/manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index f735e4dfb..a85cad9cb 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -889,7 +889,8 @@ impl PeerNetworkManager { let mut tasks = self.tasks.lock().await; tasks.spawn(async move { // Periodic DNS discovery check (only active in non-exclusive mode) - let mut dns_interval = time::interval_at(Instant::now() + DNS_DISCOVERY_DELAY, DNS_DISCOVERY_DELAY); + let mut dns_interval = + time::interval_at(Instant::now() + DNS_DISCOVERY_DELAY, DNS_DISCOVERY_DELAY); // Periodic reconnection check (active in both modes) let mut maintenance_interval = time::interval(MAINTENANCE_INTERVAL); From c3fa6681a8a41199636ac964895ecbe294ea21be Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 08:58:12 +0100 Subject: [PATCH 15/23] fix(spv): restore shutdown checks dropped during upstream refactor The upstream refactor (#430) extracted maintenance loop logic into maintenance_tick() and dns_fallback_tick() methods but dropped the shutdown_token checks that this branch had added inside the loops. Restore them so shutdown is not delayed by in-progress peer connections or pings. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/network/manager.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index a85cad9cb..e40c7815f 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -826,6 +826,8 @@ impl PeerNetworkManager { } } + if self.shutdown_token.is_cancelled() { return; } + // Send ping to all peers if needed for (addr, peer) in self.pool.get_all_peers().await { let mut peer_guard = peer.write().await; From ddc539f7a0e0858c45f43569c5d08daa55c7237a Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 09:02:48 +0100 Subject: [PATCH 16/23] fmt --- dash-spv/src/network/manager.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index e40c7815f..aae629a64 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -826,7 +826,9 @@ impl PeerNetworkManager { } } - if self.shutdown_token.is_cancelled() { return; } + if self.shutdown_token.is_cancelled() { + return; + } // Send ping to all peers if needed for (addr, peer) in self.pool.get_all_peers().await { From 531eb11666c9e7f692d9bc1416d17ad449e30c56 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 09:10:49 +0100 Subject: [PATCH 17/23] revert: CLAUDE.md logging info --- dash-spv/CLAUDE.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/dash-spv/CLAUDE.md b/dash-spv/CLAUDE.md index 97c490513..dacd9771e 100644 --- a/dash-spv/CLAUDE.md +++ b/dash-spv/CLAUDE.md @@ -168,9 +168,6 @@ Use domain-specific error types: - **Built on**: `dashcore` library with Dash-specific features enabled - **Async runtime**: Tokio with full feature set -### Logging Convention -The project uses the `tracing` framework for structured logging. New code should use `tracing::` macros (`tracing::info!`, `tracing::debug!`, `tracing::warn!`, `tracing::error!`). - ## Key Implementation Details ### Storage Architecture From 6affdaa5db30c04f533cfac4a81b9939d1cf2545 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 09:11:18 +0100 Subject: [PATCH 18/23] revert logging changes --- dash-spv/src/client/lifecycle.rs | 2 +- dash-spv/src/network/manager.rs | 177 ++++++++++++++---------------- dash-spv/src/sync/sync_manager.rs | 2 +- 3 files changed, 82 insertions(+), 99 deletions(-) diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 5adf7e998..307d9f22a 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -196,7 +196,7 @@ impl DashSpvClient Result<(), Error> { - tracing::info!("Starting peer network manager for {:?}", self.network); + log::info!("Starting peer network manager for {:?}", self.network); let mut peer_addresses: Vec = self .initial_peers @@ -158,21 +158,21 @@ impl PeerNetworkManager { .collect(); if self.exclusive_mode { - tracing::info!( + log::info!( "Exclusive peer mode: connecting ONLY to {} specified peer(s)", self.initial_peers.len() ); } else { // Load saved peers from disk let saved_peers = self.peer_store.load_peers().await.unwrap_or_else(|e| { - tracing::warn!("Failed to load peers: {}", e); + log::warn!("Failed to load peers: {}", e); Vec::new() }); peer_addresses.extend(saved_peers); // If we still have no peers, immediately discover via DNS if peer_addresses.is_empty() { - tracing::info!( + log::info!( "No peers configured, performing immediate DNS discovery for {:?}", self.network ); @@ -184,13 +184,13 @@ impl PeerNetworkManager { .take(TARGET_PEERS) .map(|addr| AddrV2Message::new(addr, ServiceFlags::NETWORK)), ); - tracing::info!( + log::info!( "DNS discovery found {} peers, using {} for startup", dns_peers_found, peer_addresses.len() ); } else { - tracing::info!( + log::info!( "Starting with {} peers from disk (DNS discovery will be used later if needed)", peer_addresses.len() ); @@ -212,7 +212,7 @@ impl PeerNetworkManager { async fn connect_to_peer(&self, addr: SocketAddr) { // Check reputation first if !self.reputation_manager.should_connect_to_peer(&addr).await { - tracing::warn!("Not connecting to {} due to bad reputation", addr); + log::warn!("Not connecting to {} due to bad reputation", addr); return; } @@ -250,12 +250,12 @@ impl PeerNetworkManager { } }; tasks.spawn(async move { - tracing::debug!("Attempting to connect to {}", addr); + log::debug!("Attempting to connect to {}", addr); let connect_result = tokio::select! { result = Peer::connect(addr, CONNECTION_TIMEOUT.as_secs(), network) => result, _ = shutdown_token.cancelled() => { - tracing::debug!("Connection to {} cancelled by shutdown", addr); + log::debug!("Connection to {} cancelled by shutdown", addr); pool.remove_peer(&addr).await; return; } @@ -268,11 +268,11 @@ impl PeerNetworkManager { HandshakeManager::new(network, mempool_strategy, user_agent); match handshake_manager.perform_handshake(&mut peer).await { Ok(_) => { - tracing::info!("Successfully connected to {}", addr); + log::info!("Successfully connected to {}", addr); // Request addresses from the peer for discovery if let Err(e) = peer.send_message(NetworkMessage::GetAddr).await { - tracing::warn!("Failed to send GetAddr to {}: {}", addr, e); + log::warn!("Failed to send GetAddr to {}: {}", addr, e); } // Record successful connection @@ -280,7 +280,7 @@ impl PeerNetworkManager { // Add to pool if let Err(e) = pool.add_peer(addr, peer).await { - tracing::error!("Failed to add peer to pool: {}", e); + log::error!("Failed to add peer to pool: {}", e); return; } @@ -318,7 +318,7 @@ impl PeerNetworkManager { .await; } Err(e) => { - tracing::warn!("Handshake failed with {}: {}", addr, e); + log::warn!("Handshake failed with {}: {}", addr, e); pool.remove_peer(&addr).await; // Update reputation for handshake failure reputation_manager @@ -334,7 +334,7 @@ impl PeerNetworkManager { } } Err(e) => { - tracing::debug!("Failed to connect to {}: {}", addr, e); + log::debug!("Failed to connect to {}: {}", addr, e); pool.remove_peer(&addr).await; // Minor reputation penalty for connection failure reputation_manager @@ -363,7 +363,7 @@ impl PeerNetworkManager { network_event_sender: broadcast::Sender, ) { tokio::spawn(async move { - tracing::debug!("Starting peer reader loop for {}", addr); + log::debug!("Starting peer reader loop for {}", addr); let mut loop_iteration = 0; let mut headers2_state = CompressionState::default(); @@ -372,7 +372,7 @@ impl PeerNetworkManager { // Check shutdown signal first with detailed logging if shutdown_token.is_cancelled() { - tracing::info!("Breaking peer reader loop for {} - shutdown signal received (iteration {})", addr, loop_iteration); + log::info!("Breaking peer reader loop for {} - shutdown signal received (iteration {})", addr, loop_iteration); break; } @@ -380,7 +380,7 @@ impl PeerNetworkManager { let peer = match pool.get_peer(&addr).await { Some(peer) => peer, None => { - tracing::warn!("Breaking peer reader loop for {} - peer no longer in pool (iteration {})", addr, loop_iteration); + log::warn!("Breaking peer reader loop for {} - peer no longer in pool (iteration {})", addr, loop_iteration); break; } }; @@ -390,7 +390,7 @@ impl PeerNetworkManager { // Try to get a read lock first to check if peer is available let peer_guard = peer.read().await; if !peer_guard.is_connected() { - tracing::warn!("Breaking peer reader loop for {} - peer no longer connected (iteration {})", addr, loop_iteration); + log::warn!("Breaking peer reader loop for {} - peer no longer connected (iteration {})", addr, loop_iteration); drop(peer_guard); break; } @@ -406,7 +406,7 @@ impl PeerNetworkManager { Ok(None) }, _ = shutdown_token.cancelled() => { - tracing::info!("Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})", addr, loop_iteration); + log::info!("Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})", addr, loop_iteration); break; } } @@ -415,7 +415,7 @@ impl PeerNetworkManager { match msg_result { Ok(Some(msg)) => { // Log all received messages at debug level to help troubleshoot - tracing::debug!("Received {:?} from {}", msg.cmd(), addr); + log::debug!("Received {:?} from {}", msg.cmd(), addr); // Handle some messages directly match &msg.inner() { @@ -425,7 +425,7 @@ impl PeerNetworkManager { } NetworkMessage::SendHeaders2 => { // Peer is indicating they will send us compressed headers - tracing::info!( + log::info!( "Peer {} sent SendHeaders2 - they will send compressed headers", addr ); @@ -439,7 +439,7 @@ impl PeerNetworkManager { continue; // Don't forward to client } NetworkMessage::GetAddr => { - tracing::trace!( + log::trace!( "Received GetAddr from {}, sending known addresses", addr ); @@ -447,11 +447,7 @@ impl PeerNetworkManager { let response = addrv2_handler.build_addr_response().await; let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.send_message(response).await { - tracing::error!( - "Failed to send addr response to {}: {}", - addr, - e - ); + log::error!("Failed to send addr response to {}: {}", addr, e); } continue; // Don't forward GetAddr to client } @@ -459,10 +455,10 @@ impl PeerNetworkManager { // Handle ping directly let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.handle_ping(*nonce).await { - tracing::error!("Failed to handle ping from {}: {}", addr, e); + log::error!("Failed to handle ping from {}: {}", addr, e); // If we can't send pong, connection is likely broken if matches!(e, NetworkError::ConnectionFailed(_)) { - tracing::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration); + log::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration); break; } } @@ -472,13 +468,13 @@ impl PeerNetworkManager { // Handle pong directly let mut peer_guard = peer.write().await; if let Err(e) = peer_guard.handle_pong(*nonce) { - tracing::error!("Failed to handle pong from {}: {}", addr, e); + log::error!("Failed to handle pong from {}: {}", addr, e); } continue; // Don't forward pong to client } NetworkMessage::Version(_) | NetworkMessage::Verack => { // These are handled during handshake, ignore here - tracing::trace!( + log::trace!( "Ignoring handshake message {:?} from {}", msg.cmd(), addr @@ -504,7 +500,7 @@ impl PeerNetworkManager { }) .collect(); if !converted.is_empty() { - tracing::debug!( + log::debug!( "Converted {} legacy addr entries from {}", converted.len(), addr @@ -515,7 +511,7 @@ impl PeerNetworkManager { } NetworkMessage::Headers(headers) => { // Log headers messages specifically - tracing::info!( + log::info!( "📨 Received Headers message from {} with {} headers! (regular uncompressed)", addr, headers.len() @@ -523,14 +519,14 @@ impl PeerNetworkManager { // Check if peer supports headers2 let peer_guard = peer.read().await; if peer_guard.supports_headers2() { - tracing::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr); + log::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr); } drop(peer_guard); // Forward to client } NetworkMessage::Headers2(headers2) => { // Decompress headers in network layer and forward as regular Headers - tracing::info!( + log::info!( "Received Headers2 from {} with {} compressed headers - decompressing", addr, headers2.headers.len() @@ -538,7 +534,7 @@ impl PeerNetworkManager { match headers2_state.process_headers(&headers2.headers) { Ok(headers) => { - tracing::info!( + log::info!( "Decompressed {} headers from {} - forwarding as regular Headers", headers.len(), addr @@ -550,7 +546,7 @@ impl PeerNetworkManager { continue; // Already sent, don't forward the original Headers2 } Err(e) => { - tracing::error!( + log::error!( "Headers2 decompression failed from {}: {} - disabling headers2", addr, e @@ -570,7 +566,7 @@ impl PeerNetworkManager { } NetworkMessage::GetHeaders(_) => { // SPV clients don't serve headers to peers - tracing::debug!( + log::debug!( "Received GetHeaders from {} - ignoring (SPV client)", addr ); @@ -578,7 +574,7 @@ impl PeerNetworkManager { } NetworkMessage::GetHeaders2(_) => { // SPV clients don't serve compressed headers to peers - tracing::debug!( + log::debug!( "Received GetHeaders2 from {} - ignoring (SPV client)", addr ); @@ -589,17 +585,13 @@ impl PeerNetworkManager { payload, } => { // Log unknown messages with more detail - tracing::warn!("Received unknown message from {}: command='{}', payload_len={}", + log::warn!("Received unknown message from {}: command='{}', payload_len={}", addr, command, payload.len()); // Still forward to client } _ => { // Forward other messages to client - tracing::trace!( - "Forwarding {:?} from {} to client", - msg.cmd(), - addr - ); + log::trace!("Forwarding {:?} from {} to client", msg.cmd(), addr); } } @@ -613,11 +605,11 @@ impl PeerNetworkManager { Err(e) => { match e { NetworkError::PeerDisconnected => { - tracing::info!("Peer {} disconnected", addr); + log::info!("Peer {} disconnected", addr); break; } NetworkError::Timeout => { - tracing::debug!("Timeout reading from {}, continuing...", addr); + log::debug!("Timeout reading from {}, continuing...", addr); // Minor reputation penalty for timeout reputation_manager .update_reputation( @@ -629,14 +621,14 @@ impl PeerNetworkManager { continue; } _ => { - tracing::error!("Fatal error reading from {}: {}", addr, e); + log::error!("Fatal error reading from {}: {}", addr, e); // Check if this is a serialization error that might have context if let NetworkError::Serialization(ref decode_error) = e { let error_msg = decode_error.to_string(); if error_msg.contains("unknown special transaction type") { - tracing::warn!("Peer {} sent block with unsupported transaction type: {}", addr, decode_error); - tracing::error!( + log::warn!("Peer {} sent block with unsupported transaction type: {}", addr, decode_error); + log::error!( "BLOCK DECODE FAILURE - Error details: {}", error_msg ); @@ -652,7 +644,7 @@ impl PeerNetworkManager { .contains("Failed to decode transactions for block") { // The error now includes the block hash - tracing::error!("Peer {} sent block that failed transaction decoding: {}", addr, decode_error); + log::error!("Peer {} sent block that failed transaction decoding: {}", addr, decode_error); // Try to extract the block hash from the error message if let Some(hash_start) = error_msg.find("block ") { if let Some(hash_end) = @@ -660,22 +652,19 @@ impl PeerNetworkManager { { let block_hash = &error_msg [hash_start + 6..hash_start + 6 + hash_end]; - tracing::error!( - "FAILING BLOCK HASH: {}", - block_hash - ); + log::error!("FAILING BLOCK HASH: {}", block_hash); } } } else if error_msg.contains("IO error") { // This might be our wrapped error - log it prominently - tracing::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", addr); - tracing::error!( + log::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", addr); + log::error!( "Serialization error from {}: {}", addr, decode_error ); } else { - tracing::error!( + log::error!( "Serialization error from {}: {}", addr, decode_error @@ -691,7 +680,7 @@ impl PeerNetworkManager { } // Remove from pool - tracing::warn!("Disconnecting from {} (peer reader loop ended)", addr); + log::warn!("Disconnecting from {} (peer reader loop ended)", addr); let removed = pool.remove_peer(&addr).await; if removed.is_some() { // Decrement connected peer counter when a peer is removed @@ -733,7 +722,7 @@ impl PeerNetworkManager { }; let Some(mut request_rx) = request_rx else { - tracing::warn!("Request processor already started or receiver unavailable"); + log::warn!("Request processor already started or receiver unavailable"); return; }; @@ -742,13 +731,13 @@ impl PeerNetworkManager { let mut tasks = self.tasks.lock().await; tasks.spawn(async move { - tracing::info!("Starting request processor task"); + log::info!("Starting request processor task"); loop { tokio::select! { request = request_rx.recv() => { match request { Some(NetworkRequest::SendMessage(msg)) => { - tracing::debug!("Request processor: sending {}", msg.cmd()); + log::debug!("Request processor: sending {}", msg.cmd()); // Spawn each send concurrently to allow parallel requests across peers. let this = this.clone(); tokio::spawn(async move { @@ -768,18 +757,18 @@ impl PeerNetworkManager { } }; if let Err(e) = result { - tracing::error!("Request processor: failed to send message: {}", e); + log::error!("Request processor: failed to send message: {}", e); } }); } None => { - tracing::info!("Request processor: channel closed"); + log::info!("Request processor: channel closed"); break; } } } _ = shutdown_token.cancelled() => { - tracing::info!("Request processor: shutting down"); + log::info!("Request processor: shutting down"); break; } } @@ -908,7 +897,7 @@ impl PeerNetworkManager { this.dns_fallback_tick().await; } _ = this.shutdown_token.cancelled() => { - tracing::info!("Maintenance loop shutting down"); + log::info!("Maintenance loop shutting down"); break; } } @@ -944,14 +933,11 @@ impl PeerNetworkManager { match filter_peer { Some(addr) => { - tracing::debug!("Selected peer {} for compact filter request", addr); + log::debug!("Selected peer {} for compact filter request", addr); addr } None => { - tracing::warn!( - "No peers support compact filters, cannot send {}", - message.cmd() - ); + log::warn!("No peers support compact filters, cannot send {}", message.cmd()); return Err(NetworkError::ProtocolError( "No peers support compact filters".to_string(), )); @@ -983,7 +969,7 @@ impl PeerNetworkManager { let chosen = selected.unwrap_or(peers[0].0); if Some(chosen) != *current_sync_peer { - tracing::info!("Sync peer selected for Headers2: {}", chosen); + log::info!("Sync peer selected for Headers2: {}", chosen); *current_sync_peer = Some(chosen); } drop(current_sync_peer); @@ -999,7 +985,7 @@ impl PeerNetworkManager { } else { // Current sync peer disconnected, pick a new one let new_addr = peers[0].0; - tracing::info!( + log::info!( "Sync peer switched from {} to {} (previous peer disconnected)", current_addr, new_addr @@ -1010,7 +996,7 @@ impl PeerNetworkManager { } else { // No current sync peer, pick the first available let new_addr = peers[0].0; - tracing::info!("Sync peer selected: {}", new_addr); + log::info!("Sync peer selected: {}", new_addr); *current_sync_peer = Some(new_addr); new_addr }; @@ -1034,7 +1020,7 @@ impl PeerNetworkManager { if !self.headers2_disabled.lock().await.contains(addr) && peer_supports_headers2 => { - tracing::debug!( + log::debug!( "Upgrading GetHeaders to GetHeaders2 for peer {}: {:?}", addr, get_headers @@ -1048,10 +1034,10 @@ impl PeerNetworkManager { NetworkMessage::GetHeaders(_) | NetworkMessage::GetCFilters(_) | NetworkMessage::GetCFHeaders(_) => { - tracing::debug!("Sending {} to {}", message.cmd(), addr); + log::debug!("Sending {} to {}", message.cmd(), addr); } NetworkMessage::GetHeaders2(gh2) => { - tracing::info!("📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}", + log::info!("📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}", addr, gh2.version, gh2.locator_hashes.len(), @@ -1060,13 +1046,10 @@ impl PeerNetworkManager { ); } NetworkMessage::SendHeaders2 => { - tracing::info!( - "🤝 Sending SendHeaders2 to {} - requesting compressed headers", - addr - ); + log::info!("🤝 Sending SendHeaders2 to {} - requesting compressed headers", addr); } _ => { - tracing::trace!("Sending {:?} to {}", message.cmd(), addr); + log::trace!("Sending {:?} to {}", message.cmd(), addr); } } @@ -1152,7 +1135,7 @@ impl PeerNetworkManager { peer_guard.can_request_headers2() }; if peer_supports_headers2 && !self.headers2_disabled.lock().await.contains(addr) { - tracing::debug!("Upgrading GetHeaders to GetHeaders2 for peer {}", addr); + log::debug!("Upgrading GetHeaders to GetHeaders2 for peer {}", addr); NetworkMessage::GetHeaders2(get_headers) } else { NetworkMessage::GetHeaders(get_headers) @@ -1161,7 +1144,7 @@ impl PeerNetworkManager { other => other, }; - tracing::debug!( + log::debug!( "Distributing {} request to peer {} (round-robin idx {})", message.cmd(), addr, @@ -1185,10 +1168,10 @@ impl PeerNetworkManager { // Reduce verbosity for common sync messages match &message { NetworkMessage::GetHeaders(_) | NetworkMessage::GetCFilters(_) => { - tracing::debug!("Broadcasting {} to {}", message.cmd(), addr); + log::debug!("Broadcasting {} to {}", message.cmd(), addr); } _ => { - tracing::trace!("Broadcasting {:?} to {}", message.cmd(), addr); + log::trace!("Broadcasting {:?} to {}", message.cmd(), addr); } } let msg = message.clone(); @@ -1216,7 +1199,7 @@ impl PeerNetworkManager { /// Disconnect a specific peer pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - tracing::info!("Disconnecting peer {} - reason: {}", addr, reason); + log::info!("Disconnecting peer {} - reason: {}", addr, reason); // Remove the peer self.pool.remove_peer(addr).await; @@ -1232,7 +1215,7 @@ impl PeerNetworkManager { /// Ban a specific peer manually pub async fn ban_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - tracing::info!("Manually banning peer {} - reason: {}", addr, reason); + log::info!("Manually banning peer {} - reason: {}", addr, reason); // Disconnect the peer first self.disconnect_peer(addr, reason).await?; @@ -1256,20 +1239,20 @@ impl PeerNetworkManager { /// Shutdown the network manager pub async fn shutdown(&self) { - tracing::info!("Shutting down peer network manager"); + log::info!("Shutting down peer network manager"); self.shutdown_token.cancel(); // Save known peers before shutdown let addresses = self.addrv2_handler.get_addresses_for_peer(MAX_ADDR_TO_STORE).await; if !addresses.is_empty() { if let Err(e) = self.peer_store.save_peers(&addresses).await { - tracing::warn!("Failed to save peers on shutdown: {}", e); + log::warn!("Failed to save peers on shutdown: {}", e); } } // Save reputation data before shutdown if let Err(e) = self.reputation_manager.save_to_storage(&*self.peer_store).await { - tracing::warn!("Failed to save reputation data on shutdown: {}", e); + log::warn!("Failed to save reputation data on shutdown: {}", e); } // Drain tasks while holding the lock. connect_to_peer() already uses @@ -1278,7 +1261,7 @@ impl PeerNetworkManager { let mut tasks = self.tasks.lock().await; while let Some(result) = tasks.join_next().await { if let Err(e) = result { - tracing::error!("Task join error: {}", e); + log::error!("Task join error: {}", e); } } @@ -1379,14 +1362,14 @@ impl NetworkManager for PeerNetworkManager { async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr, reason: &str) { match self.disconnect_peer(&address, reason).await { Ok(()) => { - tracing::warn!( + log::warn!( "Peer {} disconnected for invalid ChainLock enforcement: {}", address, reason ); } Err(err) => { - tracing::error!( + log::error!( "Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}", address, reason, @@ -1419,14 +1402,14 @@ impl NetworkManager for PeerNetworkManager { match self.disconnect_peer(&address, reason).await { Ok(()) => { - tracing::warn!( + log::warn!( "Peer {} disconnected for invalid InstantLock enforcement: {}", address, reason ); } Err(err) => { - tracing::error!( + log::error!( "Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}", address, reason, diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 6650ac4a7..568bc972d 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -212,7 +212,7 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { self.set_state(SyncState::WaitingForConnections); let progress = self.progress(); let identifier = self.identifier(); - tracing::warn!( + log::warn!( "{} {} network error, resetting to WaitingForConnections: {}", identifier, source, From ebdfbe3beaa1a86a3889730119145191806f97a2 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 09:25:01 +0100 Subject: [PATCH 19/23] Update dash-spv/src/network/manager.rs --- dash-spv/src/network/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 91e8a84e3..aba96e63d 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -165,7 +165,7 @@ impl PeerNetworkManager { } else { // Load saved peers from disk let saved_peers = self.peer_store.load_peers().await.unwrap_or_else(|e| { - log::warn!("Failed to load peers: {}", e); + tracing::warn!("Failed to load peers: {}", e); Vec::new() }); peer_addresses.extend(saved_peers); From e6c01e8c4ac9718c90fd2b280115e22860a4e15f Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 10:55:00 +0100 Subject: [PATCH 20/23] chore: fmt --- dash-spv/src/error.rs | 4 ---- dash-spv/src/sync/sync_manager.rs | 14 +++++++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index 8622f7ddb..438047c66 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -211,10 +211,6 @@ pub enum SyncError { Timeout(String), /// Network-related errors (e.g., connection failures, protocol errors). - /// - /// When returned from a `SyncManager` method, the default `run()` loop - /// resets the manager to `WaitingForConnections` and applies a cooldown - /// to prevent log/event flooding. #[error("Network error: {0}")] Network(String), diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 568bc972d..85f67726c 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -419,20 +419,18 @@ mod tests { fn identifier(&self) -> ManagerIdentifier { self.identifier } + fn state(&self) -> SyncState { self.state } + fn set_state(&mut self, state: SyncState) { self.state = state; } + fn wanted_message_types(&self) -> &'static [MessageType] { &[] } - fn progress(&self) -> SyncManagerProgress { - let mut progress = BlockHeadersProgress::default(); - progress.set_state(self.state); - SyncManagerProgress::BlockHeaders(progress) - } async fn handle_message( &mut self, @@ -456,6 +454,12 @@ mod tests { self.tick_count.fetch_add(1, Ordering::Relaxed); Ok(vec![]) } + + fn progress(&self) -> SyncManagerProgress { + let mut progress = BlockHeadersProgress::default(); + progress.set_state(self.state); + SyncManagerProgress::BlockHeaders(progress) + } } #[tokio::test] From d3d03055109e534c04565e16e1c21a883bc606f3 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 11:44:08 +0100 Subject: [PATCH 21/23] fix: don't set SyncState::WaitingForConnections) on network error --- dash-spv/src/sync/sync_manager.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 85f67726c..e777aab7c 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -198,26 +198,19 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } } - /// Reset to `WaitingForConnections`, report progress, and emit a `ManagerError` event. + /// Log a network error and emit a `ManagerError` event. /// - /// This bundles the state reset and error reporting into one call so that - /// `set_state` is always invoked before the progress snapshot — eliminating - /// the ordering bug where callers could forget to reset state first. + /// State is intentionally left unchanged: the `PeersUpdated` event path + /// handles the transition to `WaitingForConnections` when all peers are lost. fn recover_from_network_error( &mut self, context: &SyncManagerTaskContext, source: &str, msg: &str, ) { - self.set_state(SyncState::WaitingForConnections); - let progress = self.progress(); let identifier = self.identifier(); - log::warn!( - "{} {} network error, resetting to WaitingForConnections: {}", - identifier, - source, - msg - ); + log::warn!("{} {} network error: {}", identifier, source, msg); + let progress = self.progress(); context.progress_sender.send(progress).ok(); context.emit_sync_event(SyncEvent::ManagerError { manager: identifier, @@ -576,7 +569,7 @@ mod tests { /// Given a manager whose tick() returns SyncError::Network after a few calls, /// When the task loop processes the error, - /// Then it resets to WaitingForConnections and keeps running. + /// Then it stays in its current state and keeps running. #[tokio::test] async fn test_manager_resets_on_fatal_network_error() { let tick_count = Arc::new(AtomicU32::new(0)); @@ -616,7 +609,7 @@ mod tests { // 2-second cooldown, plus a few more ticks after cooldown expires. tokio::time::sleep(Duration::from_millis(2800)).await; - // Verify progress state is WaitingForConnections (not Error) + // State stays at WaitingForConnections (set by initialize(), not changed by error recovery) assert_eq!(progress_rx.borrow().state(), SyncState::WaitingForConnections); // Verify tick was called more than the error threshold (manager kept From b239c4ed4f618bc2ee214d84199239e76a46be19 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 17 Feb 2026 12:17:01 +0100 Subject: [PATCH 22/23] refactor: remove cooldown --- dash-spv/src/sync/sync_manager.rs | 37 +++++-------------------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index e777aab7c..9f35c70a4 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -13,10 +13,6 @@ use async_trait::async_trait; /// and communicates with other managers via events. Managers progress independently and /// catch up to each other as events flow between them. use std::time::Duration; - -/// Cooldown period after a network-error recovery before tick processing resumes. -/// Prevents log/event flooding when the network is persistently down. -const NETWORK_ERROR_COOLDOWN: Duration = Duration::from_secs(2); use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::watch; @@ -237,11 +233,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { // Tick interval for periodic housekeeping let mut tick_interval = interval(Duration::from_millis(100)); - // Cooldown after a network-error recovery to avoid log/event flooding. - // While active, the periodic tick branch is skipped (message and event - // branches are externally driven and don't need throttling). - let mut network_error_cooldown: Option = None; - tracing::info!("{} task entering main loop", identifier); loop { @@ -266,8 +257,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } Err(SyncError::Network(ref msg)) => { self.recover_from_network_error(&context, "message handler", msg); - network_error_cooldown = Some(tokio::time::Instant::now()); - continue; } Err(e) => { tracing::error!("{} error handling message: {}", identifier, e); @@ -297,8 +286,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } Err(SyncError::Network(ref msg)) => { self.recover_from_network_error(&context, "sync event handler", msg); - network_error_cooldown = Some(tokio::time::Instant::now()); - continue; } Err(e) => { tracing::error!("{} error handling event: {}", identifier, e); @@ -329,8 +316,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } Err(SyncError::Network(ref msg)) => { self.recover_from_network_error(&context, "network event handler", msg); - network_error_cooldown = Some(tokio::time::Instant::now()); - continue; } Err(e) => { tracing::error!("{} error handling network event: {}", identifier, e); @@ -345,16 +330,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } // Periodic tick for timeouts and housekeeping _ = tick_interval.tick() => { - // Skip tick processing while inside the network-error cooldown - // window. Message and event branches are externally driven and - // don't need this guard. - if let Some(since) = network_error_cooldown { - if since.elapsed() < NETWORK_ERROR_COOLDOWN { - continue; - } - network_error_cooldown = None; - } - let progress_before = self.progress(); match self.tick(&context.requests).await { Ok(events) => { @@ -365,8 +340,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } Err(SyncError::Network(ref msg)) => { self.recover_from_network_error(&context, "tick", msg); - network_error_cooldown = Some(tokio::time::Instant::now()); - continue; } Err(e) => { tracing::error!("{} tick error: {}", identifier, e); @@ -605,18 +578,18 @@ mod tests { // Spawn the task — it should keep running after the Network error let handle = tokio::spawn(async move { manager.run(context).await }); - // Wait long enough for the error to fire (tick 3 at ~300ms) plus the - // 2-second cooldown, plus a few more ticks after cooldown expires. - tokio::time::sleep(Duration::from_millis(2800)).await; + // Wait long enough for the error to fire (tick 3 at ~300ms) plus a + // few more ticks so we can verify the loop keeps running. + tokio::time::sleep(Duration::from_millis(600)).await; // State stays at WaitingForConnections (set by initialize(), not changed by error recovery) assert_eq!(progress_rx.borrow().state(), SyncState::WaitingForConnections); // Verify tick was called more than the error threshold (manager kept - // running after the Network error and cooldown expired). + // running after the Network error). assert!( tick_count.load(Ordering::Relaxed) > 4, - "manager should keep ticking after Network error and cooldown" + "manager should keep ticking after Network error" ); // Verify ManagerError event was emitted From 8f7eaccac8c522395aa37d5d4f8bfeca0be18b2e Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Wed, 18 Feb 2026 12:28:21 +0100 Subject: [PATCH 23/23] refactor: strip non-shutdown changes from PR 440 Revert network error recovery, doc comment changes, and sync_manager additions so this branch contains only shutdown/deadlock fixes. Non-shutdown improvements are moved to fix/spv-network-error-recovery. Co-Authored-By: Claude Opus 4.6 --- dash-spv/src/error.rs | 2 +- dash-spv/src/sync/events.rs | 2 +- dash-spv/src/sync/sync_manager.rs | 168 +----------------------------- 3 files changed, 3 insertions(+), 169 deletions(-) diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index 438047c66..eff214f6f 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -210,7 +210,7 @@ pub enum SyncError { #[error("Timeout error: {0}")] Timeout(String), - /// Network-related errors (e.g., connection failures, protocol errors). + /// Network-related errors (e.g., connection failures, protocol errors) #[error("Network error: {0}")] Network(String), diff --git a/dash-spv/src/sync/events.rs b/dash-spv/src/sync/events.rs index aad953512..11c9f66ac 100644 --- a/dash-spv/src/sync/events.rs +++ b/dash-spv/src/sync/events.rs @@ -109,7 +109,7 @@ pub enum SyncEvent { height: u32, }, - /// A manager encountered an error during sync. + /// A manager encountered a recoverable error. /// /// Emitted by: Any manager /// Consumed by: Coordinator (for logging/monitoring) diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 9f35c70a4..7954a1d4f 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -1,4 +1,4 @@ -use crate::error::{SyncError, SyncResult}; +use crate::error::SyncResult; use crate::network::{Message, MessageType, NetworkEvent, RequestSender}; use crate::sync::{ BlockHeadersProgress, BlocksProgress, ChainLockProgress, FilterHeadersProgress, @@ -194,26 +194,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } } - /// Log a network error and emit a `ManagerError` event. - /// - /// State is intentionally left unchanged: the `PeersUpdated` event path - /// handles the transition to `WaitingForConnections` when all peers are lost. - fn recover_from_network_error( - &mut self, - context: &SyncManagerTaskContext, - source: &str, - msg: &str, - ) { - let identifier = self.identifier(); - log::warn!("{} {} network error: {}", identifier, source, msg); - let progress = self.progress(); - context.progress_sender.send(progress).ok(); - context.emit_sync_event(SyncEvent::ManagerError { - manager: identifier, - error: format!("Network error ({}): {}", source, msg), - }); - } - /// Run the manager task, processing messages, events, and periodic ticks. /// /// This consumes the manager and runs until shutdown is signaled. @@ -255,9 +235,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::Network(ref msg)) => { - self.recover_from_network_error(&context, "message handler", msg); - } Err(e) => { tracing::error!("{} error handling message: {}", identifier, e); let error_event = SyncEvent::ManagerError { @@ -284,9 +261,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::Network(ref msg)) => { - self.recover_from_network_error(&context, "sync event handler", msg); - } Err(e) => { tracing::error!("{} error handling event: {}", identifier, e); } @@ -314,9 +288,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::Network(ref msg)) => { - self.recover_from_network_error(&context, "network event handler", msg); - } Err(e) => { tracing::error!("{} error handling network event: {}", identifier, e); } @@ -338,9 +309,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { } self.try_emit_progress(progress_before, &context.progress_sender); } - Err(SyncError::Network(ref msg)) => { - self.recover_from_network_error(&context, "tick", msg); - } Err(e) => { tracing::error!("{} tick error: {}", identifier, e); } @@ -479,138 +447,4 @@ mod tests { // Verify tick was called multiple times assert!(tick_count.load(Ordering::Relaxed) > 0); } - - /// Mock manager whose tick() returns SyncError::Network after a threshold. - struct NetworkErrorManager { - identifier: ManagerIdentifier, - state: SyncState, - tick_count: Arc, - error_after: u32, - } - - impl std::fmt::Debug for NetworkErrorManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NetworkErrorManager").field("identifier", &self.identifier).finish() - } - } - - #[async_trait] - impl SyncManager for NetworkErrorManager { - fn identifier(&self) -> ManagerIdentifier { - self.identifier - } - fn state(&self) -> SyncState { - self.state - } - fn set_state(&mut self, state: SyncState) { - self.state = state; - } - fn wanted_message_types(&self) -> &'static [MessageType] { - &[] - } - fn progress(&self) -> SyncManagerProgress { - let mut progress = BlockHeadersProgress::default(); - progress.set_state(self.state); - SyncManagerProgress::BlockHeaders(progress) - } - - async fn handle_message( - &mut self, - _msg: Message, - _requests: &RequestSender, - ) -> SyncResult> { - Ok(vec![]) - } - - async fn handle_sync_event( - &mut self, - _event: &SyncEvent, - _requests: &RequestSender, - ) -> SyncResult> { - Ok(vec![]) - } - - async fn tick(&mut self, _requests: &RequestSender) -> SyncResult> { - let count = self.tick_count.fetch_add(1, Ordering::Relaxed); - if count >= self.error_after { - Err(SyncError::Network("channel closed".into())) - } else { - Ok(vec![]) - } - } - } - - /// Given a manager whose tick() returns SyncError::Network after a few calls, - /// When the task loop processes the error, - /// Then it stays in its current state and keeps running. - #[tokio::test] - async fn test_manager_resets_on_fatal_network_error() { - let tick_count = Arc::new(AtomicU32::new(0)); - - let manager = NetworkErrorManager { - identifier: ManagerIdentifier::BlockHeader, - state: SyncState::Initializing, - tick_count: tick_count.clone(), - error_after: 3, - }; - - // Create channels - let (_, message_receiver) = mpsc::unbounded_channel(); - let sync_event_sender = broadcast::Sender::::new(100); - let network_event_sender = broadcast::Sender::::new(100); - let (req_tx, _req_rx) = mpsc::unbounded_channel::(); - let requests = RequestSender::new(req_tx); - let shutdown = CancellationToken::new(); - let (progress_sender, progress_rx) = watch::channel(manager.progress()); - - let context = SyncManagerTaskContext { - message_receiver, - sync_event_sender: sync_event_sender.clone(), - network_event_receiver: network_event_sender.subscribe(), - requests, - shutdown: shutdown.clone(), - progress_sender, - }; - - // Subscribe to sync events to verify ManagerError is emitted - let mut event_rx = sync_event_sender.subscribe(); - - // Spawn the task — it should keep running after the Network error - let handle = tokio::spawn(async move { manager.run(context).await }); - - // Wait long enough for the error to fire (tick 3 at ~300ms) plus a - // few more ticks so we can verify the loop keeps running. - tokio::time::sleep(Duration::from_millis(600)).await; - - // State stays at WaitingForConnections (set by initialize(), not changed by error recovery) - assert_eq!(progress_rx.borrow().state(), SyncState::WaitingForConnections); - - // Verify tick was called more than the error threshold (manager kept - // running after the Network error). - assert!( - tick_count.load(Ordering::Relaxed) > 4, - "manager should keep ticking after Network error" - ); - - // Verify ManagerError event was emitted - let mut found_error = false; - while let Ok(event) = event_rx.try_recv() { - if matches!(event, SyncEvent::ManagerError { .. }) { - found_error = true; - break; - } - } - assert!(found_error, "ManagerError event should have been emitted"); - - // Shut down the manager via the shutdown token - shutdown.cancel(); - - let result = tokio::time::timeout(Duration::from_secs(2), handle) - .await - .expect("task should exit after shutdown signal") - .unwrap(); - - assert!(result.is_ok()); - assert_eq!(result.unwrap(), ManagerIdentifier::BlockHeader); - } }