diff --git a/dash-spv/src/client/config.rs b/dash-spv/src/client/config.rs index 5d9e579bf..11ccfb568 100644 --- a/dash-spv/src/client/config.rs +++ b/dash-spv/src/client/config.rs @@ -78,42 +78,9 @@ pub struct ClientConfig { /// Maximum concurrent filter requests (default: 8). pub max_concurrent_filter_requests: usize, - /// Enable flow control for filter requests (default: true). - pub enable_filter_flow_control: bool, - /// Delay between filter requests in milliseconds (default: 50). pub filter_request_delay_ms: u64, - /// Enable automatic CFHeader gap detection and restart - pub enable_cfheader_gap_restart: bool, - - /// Interval for checking CFHeader gaps (seconds) - pub cfheader_gap_check_interval_secs: u64, - - /// Cooldown between CFHeader restart attempts (seconds) - pub cfheader_gap_restart_cooldown_secs: u64, - - /// Maximum CFHeader gap restart attempts - pub max_cfheader_gap_restart_attempts: u32, - - /// Enable automatic filter gap detection and restart - pub enable_filter_gap_restart: bool, - - /// Interval for checking filter gaps (seconds) - pub filter_gap_check_interval_secs: u64, - - /// Minimum filter gap size to trigger restart (blocks) - pub min_filter_gap_size: u32, - - /// Cooldown between filter restart attempts (seconds) - pub filter_gap_restart_cooldown_secs: u64, - - /// Maximum filter gap restart attempts - pub max_filter_gap_restart_attempts: u32, - - /// Maximum number of filters to sync in a single gap sync batch - pub max_filter_gap_sync_size: u32, - // Mempool configuration /// Enable tracking of unconfirmed (mempool) transactions. pub enable_mempool_tracking: bool, @@ -159,9 +126,6 @@ pub struct ClientConfig { /// Maximum concurrent CFHeaders requests for parallel sync (default: 50). pub max_concurrent_cfheaders_requests_parallel: usize, - /// Enable flow control for CFHeaders requests (default: true). - pub enable_cfheaders_flow_control: bool, - /// Timeout for CFHeaders requests in seconds (default: 30). pub cfheaders_request_timeout_secs: u64, @@ -210,18 +174,7 @@ impl Default for ClientConfig { log_level: "info".to_string(), user_agent: None, max_concurrent_filter_requests: 16, - enable_filter_flow_control: true, filter_request_delay_ms: 0, - enable_cfheader_gap_restart: true, - cfheader_gap_check_interval_secs: 15, - cfheader_gap_restart_cooldown_secs: 30, - max_cfheader_gap_restart_attempts: 5, - enable_filter_gap_restart: true, - filter_gap_check_interval_secs: 20, - min_filter_gap_size: 10, - filter_gap_restart_cooldown_secs: 30, - max_filter_gap_restart_attempts: 5, - max_filter_gap_sync_size: 50000, // Mempool defaults enable_mempool_tracking: true, mempool_strategy: MempoolStrategy::FetchAll, @@ -243,7 +196,6 @@ impl Default for ClientConfig { wallet_creation_time: None, // CFHeaders flow control defaults max_concurrent_cfheaders_requests_parallel: 50, - enable_cfheaders_flow_control: true, cfheaders_request_timeout_secs: 30, max_cfheaders_retries: 3, // QRInfo defaults (simplified per plan) @@ -341,12 +293,6 @@ impl ClientConfig { self } - /// Enable or disable filter flow control. - pub fn with_filter_flow_control(mut self, enabled: bool) -> Self { - self.enable_filter_flow_control = enabled; - self - } - /// Set delay between filter requests. pub fn with_filter_request_delay(mut self, delay_ms: u64) -> Self { self.filter_request_delay_ms = delay_ms; diff --git a/dash-spv/src/client/config_test.rs b/dash-spv/src/client/config_test.rs index 6bc8c14fa..a1c691181 100644 --- a/dash-spv/src/client/config_test.rs +++ b/dash-spv/src/client/config_test.rs @@ -28,7 +28,6 @@ mod tests { assert!(config.enable_persistence); assert_eq!(config.log_level, "info"); assert_eq!(config.max_concurrent_filter_requests, 16); - assert!(config.enable_filter_flow_control); assert_eq!(config.filter_request_delay_ms, 0); // Mempool defaults @@ -66,7 +65,6 @@ mod tests { .with_connection_timeout(Duration::from_secs(10)) .with_log_level("debug") .with_max_concurrent_filter_requests(32) - .with_filter_flow_control(false) .with_filter_request_delay(100) .with_mempool_tracking(MempoolStrategy::BloomFilter) .with_max_mempool_transactions(500) @@ -80,7 +78,6 @@ mod tests { assert_eq!(config.connection_timeout, Duration::from_secs(10)); assert_eq!(config.log_level, "debug"); assert_eq!(config.max_concurrent_filter_requests, 32); - assert!(!config.enable_filter_flow_control); assert_eq!(config.filter_request_delay_ms, 100); // Mempool settings @@ -196,28 +193,6 @@ mod tests { // Removed selective strategy validation test; Selective variant no longer exists - #[test] - fn test_cfheader_gap_settings() { - let config = ClientConfig::default(); - - assert!(config.enable_cfheader_gap_restart); - assert_eq!(config.cfheader_gap_check_interval_secs, 15); - assert_eq!(config.cfheader_gap_restart_cooldown_secs, 30); - assert_eq!(config.max_cfheader_gap_restart_attempts, 5); - } - - #[test] - fn test_filter_gap_settings() { - let config = ClientConfig::default(); - - assert!(config.enable_filter_gap_restart); - assert_eq!(config.filter_gap_check_interval_secs, 20); - assert_eq!(config.min_filter_gap_size, 10); - assert_eq!(config.filter_gap_restart_cooldown_secs, 30); - assert_eq!(config.max_filter_gap_restart_attempts, 5); - assert_eq!(config.max_filter_gap_sync_size, 50000); - } - #[test] fn test_request_control_defaults() { let config = ClientConfig::default(); diff --git a/dash-spv/src/client/filter_sync.rs b/dash-spv/src/client/filter_sync.rs index a1d996610..e69de29bb 100644 --- a/dash-spv/src/client/filter_sync.rs +++ b/dash-spv/src/client/filter_sync.rs @@ -1,171 +0,0 @@ -//! Filter synchronization and management for the Dash SPV client. - -use crate::error::{Result, SpvError}; -use crate::network::NetworkManager; -use crate::storage::StorageManager; -use crate::sync::SyncManager; -use crate::types::FilterMatch; -use crate::types::SpvStats; -use key_wallet_manager::wallet_interface::WalletInterface; -use std::sync::Arc; -use tokio::sync::RwLock; - -/// Filter synchronization manager for coordinating filter downloads and checking. -pub struct FilterSyncCoordinator<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> { - sync_manager: &'a mut SyncManager, - storage: &'a mut S, - network: &'a mut N, - stats: &'a Arc>, - running: &'a Arc>, -} - -impl< - 'a, - S: StorageManager + Send + Sync + 'static, - N: NetworkManager + Send + Sync + 'static, - W: WalletInterface, - > FilterSyncCoordinator<'a, S, N, W> -{ - /// Create a new filter sync coordinator. - pub fn new( - sync_manager: &'a mut SyncManager, - storage: &'a mut S, - network: &'a mut N, - stats: &'a Arc>, - running: &'a Arc>, - ) -> Self { - Self { - sync_manager, - storage, - network, - stats, - running, - } - } - - /// Sync compact filters for recent blocks and check for matches. - /// Sync and check filters with internal monitoring loop management. - /// This method automatically handles the monitoring loop required for CFilter message processing. - pub async fn sync_and_check_filters_with_monitoring( - &mut self, - num_blocks: Option, - ) -> Result> { - // Just delegate to the regular method for now - the real fix is in sync_filters_coordinated - self.sync_and_check_filters(num_blocks).await - } - - pub async fn sync_and_check_filters( - &mut self, - num_blocks: Option, - ) -> Result> { - let running = self.running.read().await; - if !*running { - return Err(SpvError::Config("Client not running".to_string())); - } - drop(running); - - // Get current filter tip height to determine range (use filter headers, not block headers) - // This ensures consistency between range calculation and progress tracking - let tip_height = - self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0); - - // Determine how many blocks to request - let num_blocks = num_blocks.unwrap_or(100).max(1); - let default_start = tip_height.saturating_sub(num_blocks - 1); - - // Ask the wallet for an earliest rescan height, falling back to the default window. - let wallet_hint = self.sync_manager.wallet_birth_height_hint().await; - let mut start_height = wallet_hint.unwrap_or(default_start).min(default_start); - - // Respect any user-provided start height hint from the configuration. - if let Some(config_start) = self.sync_manager.config_start_height() { - let capped = config_start.min(tip_height); - start_height = start_height.max(capped); - } - - // Make sure we never request past the current tip - start_height = start_height.min(tip_height); - - let actual_count = if start_height <= tip_height { - tip_height - start_height + 1 - } else { - 0 - }; - - tracing::info!( - "Requesting filters from height {} to {} ({} blocks based on filter tip height)", - start_height, - tip_height, - actual_count - ); - if let Some(hint) = wallet_hint { - tracing::debug!("Wallet hint for earliest required height: {}", hint); - } - tracing::info!("Filter processing and matching will happen automatically in background thread as CFilter messages arrive"); - - // Send filter requests - processing will happen automatically in the background - if actual_count > 0 { - self.sync_filters_coordinated(start_height, actual_count).await?; - } else { - tracing::debug!("No filters requested because calculated range is empty"); - } - - // Return empty vector since matching happens asynchronously in the filter processor thread - // Actual matches will be processed and blocks requested automatically when CFilter messages arrive - Ok(Vec::new()) - } - - /// Sync filters for a specific height range. - pub async fn sync_filters_range( - &mut self, - start_height: Option, - count: Option, - ) -> Result<()> { - // Get filter tip height to determine default values - let filter_tip_height = - self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0); - - let start = start_height.unwrap_or(filter_tip_height.saturating_sub(99)); - let num_blocks = count.unwrap_or(100); - - tracing::info!( - "Starting filter sync for specific range from height {} ({} blocks)", - start, - num_blocks - ); - - self.sync_filters_coordinated(start, num_blocks).await - } - - /// Sync filters in coordination with the monitoring loop using flow control processing - async fn sync_filters_coordinated(&mut self, start_height: u32, count: u32) -> Result<()> { - tracing::info!("Starting coordinated filter sync with flow control from height {} to {} ({} filters expected)", - start_height, start_height + count - 1, count); - - // Start tracking filter sync progress - crate::sync::filters::FilterSyncManager::::start_filter_sync_tracking( - self.stats, - count as u64, - ) - .await; - - // Use the new flow control method - self.sync_manager - .filter_sync_mut() - .sync_filters_with_flow_control( - &mut *self.network, - &mut *self.storage, - Some(start_height), - Some(count), - ) - .await - .map_err(SpvError::Sync)?; - - let (pending_count, active_count, flow_enabled) = - self.sync_manager.filter_sync().get_flow_control_status(); - tracing::info!("✅ Filter sync with flow control initiated (flow control enabled: {}, {} requests queued, {} active)", - flow_enabled, pending_count, active_count); - - Ok(()) - } -} diff --git a/dash-spv/src/client/message_handler.rs b/dash-spv/src/client/message_handler.rs index d5e394ebb..a0a22bc22 100644 --- a/dash-spv/src/client/message_handler.rs +++ b/dash-spv/src/client/message_handler.rs @@ -6,7 +6,7 @@ use crate::mempool_filter::MempoolFilter; use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::SyncManager; -use crate::types::{MempoolState, SpvEvent, SpvStats}; +use crate::types::{MempoolState, SpvEvent}; // Removed local ad-hoc compact filter construction in favor of always processing full blocks use key_wallet_manager::wallet_interface::WalletInterface; use std::sync::Arc; @@ -18,7 +18,6 @@ pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager, W: WalletInt storage: &'a mut S, network: &'a mut N, config: &'a ClientConfig, - stats: &'a Arc>, block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender, mempool_filter: &'a Option>, mempool_state: &'a Arc>, @@ -39,7 +38,6 @@ impl< storage: &'a mut S, network: &'a mut N, config: &'a ClientConfig, - stats: &'a Arc>, block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender< crate::client::BlockProcessingTask, >, @@ -52,7 +50,6 @@ impl< storage, network, config, - stats, block_processor_tx, mempool_filter, mempool_state, @@ -338,17 +335,6 @@ impl< } NetworkMessage::CFilter(cfilter) => { tracing::debug!("Received CFilter for block {}", cfilter.block_hash); - - // Record the height of this received filter for gap tracking - crate::sync::filters::FilterSyncManager::::record_filter_received_at_height( - self.stats, - &*self.storage, - &cfilter.block_hash, - ) - .await; - - // Sequential sync manager handles the filter internally - // For sequential sync, filter checking is done within the sync manager } NetworkMessage::SendDsq(wants_dsq) => { tracing::info!("Received SendDsq message - peer wants DSQ messages: {}", wants_dsq); diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index c65d5131d..cdb410f0e 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -17,7 +17,6 @@ //! //! - `block_processor.rs` (649 lines) - Block processing and validation //! - `config.rs` (484 lines) - Client configuration -//! - `filter_sync.rs` (171 lines) - Filter synchronization //! - `message_handler.rs` (585 lines) - Network message handling //! - `status_display.rs` (242 lines) - Status display formatting //! @@ -35,7 +34,6 @@ // Existing extracted modules pub mod block_processor; pub mod config; -pub mod filter_sync; pub mod interface; pub mod message_handler; pub mod status_display; @@ -54,7 +52,6 @@ mod transactions; // Re-export public types from extracted modules pub use block_processor::{BlockProcessingTask, BlockProcessor}; pub use config::ClientConfig; -pub use filter_sync::FilterSyncCoordinator; pub use message_handler::MessageHandler; pub use status_display::StatusDisplay; diff --git a/dash-spv/src/client/sync_coordinator.rs b/dash-spv/src/client/sync_coordinator.rs index fb9a475d9..c84287438 100644 --- a/dash-spv/src/client/sync_coordinator.rs +++ b/dash-spv/src/client/sync_coordinator.rs @@ -99,11 +99,6 @@ impl< let mut last_consistency_check = Instant::now(); let consistency_check_interval = Duration::from_secs(300); // Every 5 minutes - // Timer for filter gap checking - let mut last_filter_gap_check = Instant::now(); - let filter_gap_check_interval = - Duration::from_secs(self.config.cfheader_gap_check_interval_secs); - // Timer for pending ChainLock validation let mut last_chainlock_validation_check = Instant::now(); let chainlock_validation_interval = Duration::from_secs(30); // Every 30 seconds @@ -437,18 +432,6 @@ impl< last_consistency_check = Instant::now(); } - // Check for missing filters and retry periodically - if last_filter_gap_check.elapsed() >= filter_gap_check_interval { - if self.config.enable_filters { - // Sequential sync handles filter retries internally - - // Sequential sync handles CFHeader gap detection and recovery internally - - // Sequential sync handles filter gap detection and recovery internally - } - last_filter_gap_check = Instant::now(); - } - // Check if masternode sync has completed and update ChainLock validation if !masternode_engine_updated && self.config.enable_masternodes { // Check if we have a masternode engine available now @@ -642,7 +625,6 @@ impl< &mut *storage, &mut self.network, &self.config, - &self.stats, &self.block_processor_tx, &self.mempool_filter, &self.mempool_state, @@ -736,35 +718,6 @@ impl< Ok(()) } - /// Sync filters and check for wallet matches (legacy method). - pub async fn sync_and_check_filters_with_monitoring( - &mut self, - num_blocks: Option, - ) -> Result> { - self.sync_and_check_filters(num_blocks).await - } - - /// Sync filters and check for wallet matches. - pub async fn sync_and_check_filters( - &mut self, - _num_blocks: Option, - ) -> Result> { - // Sequential sync handles filter sync internally - tracing::info!("Sequential sync mode: filter sync handled internally"); - Ok(Vec::new()) - } - - /// Sync filters for a specific height range. - pub async fn sync_filters_range( - &mut self, - _start_height: Option, - _count: Option, - ) -> Result<()> { - // Sequential sync handles filter range sync internally - tracing::info!("Sequential sync mode: filter range sync handled internally"); - Ok(()) - } - // ============ Sync State Persistence and Restoration ============ /// Restore sync state from persistent storage. diff --git a/dash-spv/src/network/tests.rs b/dash-spv/src/network/tests.rs index d69239b30..0b899cc21 100644 --- a/dash-spv/src/network/tests.rs +++ b/dash-spv/src/network/tests.rs @@ -27,23 +27,11 @@ mod peer_network_manager_tests { max_peers: 3, enable_persistence: false, log_level: "info".to_string(), - enable_filter_flow_control: true, filter_request_delay_ms: 0, max_concurrent_filter_requests: 50, max_concurrent_cfheaders_requests_parallel: 50, - enable_cfheaders_flow_control: true, cfheaders_request_timeout_secs: 30, max_cfheaders_retries: 3, - enable_cfheader_gap_restart: true, - cfheader_gap_check_interval_secs: 15, - cfheader_gap_restart_cooldown_secs: 30, - max_cfheader_gap_restart_attempts: 5, - enable_filter_gap_restart: true, - filter_gap_check_interval_secs: 20, - min_filter_gap_size: 10, - filter_gap_restart_cooldown_secs: 30, - max_filter_gap_restart_attempts: 5, - max_filter_gap_sync_size: 50000, // Mempool fields enable_mempool_tracking: false, mempool_strategy: crate::client::config::MempoolStrategy::BloomFilter, diff --git a/dash-spv/src/sync/filters/download.rs b/dash-spv/src/sync/filters/download.rs index 292c713db..c325df0bc 100644 --- a/dash-spv/src/sync/filters/download.rs +++ b/dash-spv/src/sync/filters/download.rs @@ -5,7 +5,7 @@ //! //! ## Key Features //! -//! - Filter request queue management with flow control +//! - Filter request queue management //! - Parallel filter downloads with concurrency limits //! - Filter verification against CFHeaders //! - Individual filter header downloads for blocks @@ -16,7 +16,6 @@ use dashcore::{ BlockHash, }; -use super::types::*; use crate::error::{SyncError, SyncResult}; use crate::network::NetworkManager; use crate::storage::StorageManager; @@ -70,104 +69,14 @@ impl, - count: Option, - ) -> SyncResult { - if self.syncing_filters { - return Err(SyncError::SyncInProgress); - } - - self.syncing_filters = true; - - // Determine range to sync - let filter_tip_height = storage - .get_filter_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get filter tip: {}", e)))? - .unwrap_or(0); - - let start = start_height.unwrap_or_else(|| { - // Default: sync last blocks for recent transaction discovery - filter_tip_height.saturating_sub(DEFAULT_FILTER_SYNC_RANGE) - }); - - let end = count.map(|c| start + c - 1).unwrap_or(filter_tip_height).min(filter_tip_height); // Ensure we don't go beyond available filter headers - - let base_height = self.sync_base_height; - let clamped_start = start.max(base_height); - - if clamped_start > end { - self.syncing_filters = false; - return Ok(SyncProgress::default()); - } - - tracing::info!( - "🔄 Starting compact filter sync from height {} to {} ({} blocks)", - clamped_start, - end, - end - clamped_start + 1 - ); - - // Request filters in batches - let batch_size = FILTER_REQUEST_BATCH_SIZE; - let mut current_height = clamped_start; - let mut filters_downloaded = 0; - - while current_height <= end { - let batch_end = (current_height + batch_size - 1).min(end); - - tracing::debug!("Requesting filters for heights {} to {}", current_height, batch_end); - - let stop_hash = storage - .get_header(batch_end) - .await - .map_err(|e| SyncError::Storage(format!("Failed to get stop header: {}", e)))? - .ok_or_else(|| SyncError::Storage("Stop header not found".to_string()))? - .block_hash(); - - self.request_filters(network, current_height, stop_hash).await?; - - // Note: Filter responses will be handled by the monitoring loop - // This method now just sends requests and trusts that responses - // will be processed by the centralized message handler - tracing::debug!("Sent filter request for batch {} to {}", current_height, batch_end); - - let batch_size_actual = batch_end - current_height + 1; - filters_downloaded += batch_size_actual; - current_height = batch_end + 1; - } - - self.syncing_filters = false; - - tracing::info!( - "✅ Compact filter synchronization completed. Downloaded {} filters", - filters_downloaded - ); - - Ok(SyncProgress { - filters_downloaded: filters_downloaded as u64, - ..SyncProgress::default() - }) - } - pub async fn sync_filters_with_flow_control( + pub async fn sync_filters( &mut self, network: &mut N, storage: &mut S, start_height: Option, count: Option, ) -> SyncResult { - if !self.flow_control_enabled { - // Fall back to original method if flow control is disabled - return self.sync_filters(network, storage, start_height, count).await; - } - if self.syncing_filters { return Err(SyncError::SyncInProgress); } @@ -180,13 +89,13 @@ impl SyncResult<()> { - if !self.flow_control_enabled { - return Ok(()); - } - // Record the received filter self.record_individual_filter_received(block_hash, storage).await?; @@ -316,48 +221,6 @@ impl SyncResult<()> { - // Find the end height for the stop hash - let header_tip_height = storage - .get_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? - .ok_or_else(|| { - SyncError::Storage("No headers available for filter sync".to_string()) - })?; - - let end_height = self - .find_height_for_block_hash(&stop_hash, storage, start_height, header_tip_height) - .await? - .ok_or_else(|| { - SyncError::Validation(format!( - "Cannot find height for stop hash {} in range {}-{}", - stop_hash, start_height, header_tip_height - )) - })?; - - // Safety check: ensure we don't request more than the Dash Core limit - let range_size = end_height.saturating_sub(start_height) + 1; - if range_size > MAX_FILTER_REQUEST_SIZE { - return Err(SyncError::Validation(format!( - "Filter request range {}-{} ({} filters) exceeds maximum allowed size of {}", - start_height, end_height, range_size, MAX_FILTER_REQUEST_SIZE - ))); - } - - // Record this request for tracking - self.record_filter_request(start_height, end_height); - - // Send the actual request - self.request_filters(network, start_height, stop_hash).await - } - pub(super) async fn find_height_for_block_hash( &self, block_hash: &BlockHash, @@ -381,94 +244,6 @@ impl SyncResult<()> { - // Get the block height for this hash by scanning headers - let header_tip_height = storage - .get_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? - .ok_or_else(|| { - SyncError::Storage("No headers available for filter sync".to_string()) - })?; - - let height = self - .find_height_for_block_hash(&block_hash, storage, 0, header_tip_height) - .await? - .ok_or_else(|| { - SyncError::Validation(format!( - "Cannot find height for block {} - header not found", - block_hash - )) - })?; - - // Check if we already have this filter header - if storage - .get_filter_header(height) - .await - .map_err(|e| SyncError::Storage(format!("Failed to check filter header: {}", e)))? - .is_some() - { - tracing::debug!( - "Filter header for block {} at height {} already exists", - block_hash, - height - ); - return Ok(()); - } - - tracing::info!("📥 Requesting filter header for block {} at height {}", block_hash, height); - - // Request filter header using getcfheaders - self.request_filter_headers(network, height, block_hash).await?; - - Ok(()) - } - - pub async fn download_and_check_filter( - &mut self, - block_hash: BlockHash, - network: &mut N, - storage: &mut S, - ) -> SyncResult { - // TODO: Will check with wallet once integrated - - // Get the block height for this hash by scanning headers - let header_tip_height = storage - .get_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? - .unwrap_or(0); - - let height = self - .find_height_for_block_hash(&block_hash, storage, 0, header_tip_height) - .await? - .ok_or_else(|| { - SyncError::Validation(format!( - "Cannot find height for block {} - header not found", - block_hash - )) - })?; - - tracing::info!( - "📥 Requesting compact filter for block {} at height {}", - block_hash, - height - ); - - // Request the compact filter using getcfilters - self.request_filters(network, height, block_hash).await?; - - // Note: The actual filter checking will happen when we receive the CFilter message - // This method just initiates the download. The client will need to handle the response. - - Ok(false) // Return false for now, will be updated when we process the response - } - pub async fn store_filter_headers( &mut self, cfheaders: dashcore::network::message_filter::CFHeaders, diff --git a/dash-spv/src/sync/filters/gaps.rs b/dash-spv/src/sync/filters/gaps.rs deleted file mode 100644 index 289e2e893..000000000 --- a/dash-spv/src/sync/filters/gaps.rs +++ /dev/null @@ -1,490 +0,0 @@ -//! Gap detection and recovery logic. -//! -//! This module handles: -//! - Detecting gaps between headers and filter headers -//! - Detecting gaps between filter headers and downloaded filters -//! - Finding missing filter ranges within requested ranges -//! - Retrying missing or timed-out filter requests -//! - Auto-restarting filter header sync when gaps are detected - -use super::types::*; -use crate::error::{SyncError, SyncResult}; -use crate::network::NetworkManager; -use crate::storage::StorageManager; -use std::collections::HashSet; - -impl - super::manager::FilterSyncManager -{ - /// Record a filter request for a height range. - /// - /// Tracks when the request was made for timeout detection. - pub fn record_filter_request(&mut self, start_height: u32, end_height: u32) { - self.requested_filter_ranges.insert((start_height, end_height), std::time::Instant::now()); - tracing::debug!("📊 Recorded filter request for range {}-{}", start_height, end_height); - } - - /// Record receipt of a filter at a specific height. - pub fn record_filter_received(&mut self, height: u32) { - if let Ok(mut heights) = self.received_filter_heights.try_lock() { - heights.insert(height); - tracing::trace!("📊 Recorded filter received at height {}", height); - } - } - - /// Find missing filter ranges within the requested ranges. - /// - /// Returns a list of (start_height, end_height) tuples for ranges where - /// filters were requested but not all filters have been received. - pub fn find_missing_ranges(&self) -> Vec<(u32, u32)> { - let mut missing_ranges = Vec::new(); - - let heights = match self.received_filter_heights.try_lock() { - Ok(heights) => heights.clone(), - Err(_) => return missing_ranges, - }; - - // For each requested range - for (start, end) in self.requested_filter_ranges.keys() { - let mut current = *start; - - // Find gaps within this range - while current <= *end { - if !heights.contains(¤t) { - // Start of a gap - let gap_start = current; - - // Find end of gap - while current <= *end && !heights.contains(¤t) { - current += 1; - } - - missing_ranges.push((gap_start, current - 1)); - } else { - current += 1; - } - } - } - - // Merge adjacent ranges for efficiency - Self::merge_adjacent_ranges(&mut missing_ranges); - missing_ranges - } - - /// Check if a filter range is complete (all heights received). - pub fn is_range_complete(&self, start_height: u32, end_height: u32) -> bool { - let heights = match self.received_filter_heights.try_lock() { - Ok(heights) => heights, - Err(_) => return false, - }; - - for height in start_height..=end_height { - if !heights.contains(&height) { - return false; - } - } - true - } - - /// Get total number of missing filters across all ranges. - pub fn get_total_missing_filters(&self) -> u32 { - let missing_ranges = self.find_missing_ranges(); - missing_ranges.iter().map(|(start, end)| end - start + 1).sum() - } - - /// Get actual coverage percentage (considering gaps). - /// - /// Returns percentage of requested filters that have been received. - pub fn get_actual_coverage_percentage(&self) -> f64 { - if self.requested_filter_ranges.is_empty() { - return 0.0; - } - - let total_requested: u32 = - self.requested_filter_ranges.iter().map(|((start, end), _)| end - start + 1).sum(); - - if total_requested == 0 { - return 0.0; - } - - let total_missing = self.get_total_missing_filters(); - let received = total_requested - total_missing; - - (received as f64 / total_requested as f64) * 100.0 - } - - /// Check if there's a gap between block headers and filter headers. - /// - /// Returns (has_gap, block_height, filter_height, gap_size). - /// A gap of <= 1 block is considered normal (edge case at tip). - pub async fn check_cfheader_gap(&self, storage: &S) -> SyncResult<(bool, u32, u32, u32)> { - let block_height = storage - .get_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get block tip: {}", e)))? - .unwrap_or(0); - - let filter_height = storage - .get_filter_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get filter tip: {}", e)))? - .unwrap_or(0); - - let gap_size = block_height.saturating_sub(filter_height); - - // Consider within 1 block as "no gap" to handle edge cases at the tip - let has_gap = gap_size > 1; - - tracing::debug!( - "CFHeader gap check: block_height={}, filter_height={}, gap={}", - block_height, - filter_height, - gap_size - ); - - Ok((has_gap, block_height, filter_height, gap_size)) - } - - /// Check if there's a gap between synced filters and filter headers. - /// - /// Returns (has_gap, filter_header_height, last_synced_filter, gap_size). - pub async fn check_filter_gap( - &self, - storage: &S, - progress: &crate::types::SyncProgress, - ) -> SyncResult<(bool, u32, u32, u32)> { - // Get filter header tip height - let filter_header_height = storage - .get_filter_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get filter tip height: {}", e)))? - .unwrap_or(0); - - // Get last synced filter height from progress tracking - let last_synced_filter = progress.last_synced_filter_height.unwrap_or(0); - - // Calculate gap - let gap_size = filter_header_height.saturating_sub(last_synced_filter); - let has_gap = gap_size > 0; - - tracing::debug!( - "Filter gap check: filter_header_height={}, last_synced_filter={}, gap={}", - filter_header_height, - last_synced_filter, - gap_size - ); - - Ok((has_gap, filter_header_height, last_synced_filter, gap_size)) - } - - /// Attempt to restart filter header sync if there's a gap and conditions are met. - /// - /// Returns true if sync was restarted, false otherwise. - /// Respects cooldown period and max retry attempts to prevent spam. - pub async fn maybe_restart_cfheader_sync_for_gap( - &mut self, - network: &mut N, - storage: &mut S, - ) -> SyncResult { - // Check if we're already syncing - if self.syncing_filter_headers { - return Ok(false); - } - - // Check gap detection cooldown - if let Some(last_attempt) = self.last_gap_restart_attempt { - if last_attempt.elapsed() < self.gap_restart_cooldown { - return Ok(false); // Too soon since last attempt - } - } - - // Check if we've exceeded max attempts - if self.gap_restart_failure_count >= self.max_gap_restart_attempts { - tracing::warn!( - "⚠️ CFHeader gap restart disabled after {} failed attempts", - self.max_gap_restart_attempts - ); - return Ok(false); - } - - // Check for gap - let (has_gap, block_height, filter_height, gap_size) = - self.check_cfheader_gap(storage).await?; - - if !has_gap { - // Reset failure count if no gap - if self.gap_restart_failure_count > 0 { - tracing::debug!("✅ CFHeader gap resolved, resetting failure count"); - self.gap_restart_failure_count = 0; - } - return Ok(false); - } - - // Gap detected - attempt restart - tracing::info!( - "🔄 CFHeader gap detected: {} block headers vs {} filter headers (gap: {})", - block_height, - filter_height, - gap_size - ); - tracing::info!("🚀 Auto-restarting filter header sync to close gap..."); - - self.last_gap_restart_attempt = Some(std::time::Instant::now()); - - match self.start_sync_headers(network, storage).await { - Ok(started) => { - if started { - tracing::info!("✅ CFHeader sync restarted successfully"); - self.gap_restart_failure_count = 0; // Reset on success - Ok(true) - } else { - tracing::warn!( - "⚠️ CFHeader sync restart returned false (already up to date?)" - ); - self.gap_restart_failure_count += 1; - Ok(false) - } - } - Err(e) => { - tracing::error!("❌ Failed to restart CFHeader sync: {}", e); - self.gap_restart_failure_count += 1; - Err(e) - } - } - } - - /// Retry missing or timed out filter ranges. - /// - /// Finds missing and timed-out ranges, deduplicates them, and re-requests. - /// Respects max retry count and batch size limits. - /// Returns number of ranges retried. - pub async fn retry_missing_filters(&mut self, network: &mut N, storage: &S) -> SyncResult { - let missing = self.find_missing_ranges(); - let timed_out = self.get_timed_out_ranges(std::time::Duration::from_secs(30)); - - // Combine and deduplicate - let mut ranges_to_retry: HashSet<(u32, u32)> = missing.into_iter().collect(); - ranges_to_retry.extend(timed_out); - - if ranges_to_retry.is_empty() { - return Ok(0); - } - - let mut retried_count = 0; - - for (start, end) in ranges_to_retry { - let retry_count = self.filter_retry_counts.get(&(start, end)).copied().unwrap_or(0); - - if retry_count >= self.max_filter_retries { - tracing::error!( - "❌ Filter range {}-{} failed after {} retries, giving up", - start, - end, - retry_count - ); - continue; - } - - // Ensure retry end height is within the stored header window - if self.header_abs_to_storage_index(end).is_none() { - tracing::debug!( - "Skipping retry for range {}-{} because end is below checkpoint base {}", - start, - end, - self.sync_base_height - ); - continue; - } - - match storage.get_header(end).await { - Ok(Some(header)) => { - let stop_hash = header.block_hash(); - - tracing::info!( - "🔄 Retrying filter range {}-{} (attempt {}/{})", - start, - end, - retry_count + 1, - self.max_filter_retries - ); - - // Re-request the range, but respect batch size limits - let range_size = end - start + 1; - if range_size <= MAX_FILTER_REQUEST_SIZE { - // Range is within limits, request directly - self.request_filters(network, start, stop_hash).await?; - self.filter_retry_counts.insert((start, end), retry_count + 1); - retried_count += 1; - } else { - // Range is too large, split into smaller batches - tracing::warn!( - "Filter range {}-{} ({} filters) exceeds Dash Core's 1000 filter limit, splitting into batches", - start, - end, - range_size - ); - - let max_batch_size = MAX_FILTER_REQUEST_SIZE; - let mut current_start = start; - - while current_start <= end { - let batch_end = (current_start + max_batch_size - 1).min(end); - - if self.header_abs_to_storage_index(batch_end).is_none() { - tracing::debug!( - "Skipping retry batch {}-{} because batch end is below checkpoint base {}", - current_start, - batch_end, - self.sync_base_height - ); - current_start = batch_end + 1; - continue; - } - - match storage.get_header(batch_end).await { - Ok(Some(batch_header)) => { - let batch_stop_hash = batch_header.block_hash(); - - tracing::info!( - "🔄 Retrying filter batch {}-{} (part of range {}-{}, attempt {}/{})", - current_start, - batch_end, - start, - end, - retry_count + 1, - self.max_filter_retries - ); - - self.request_filters(network, current_start, batch_stop_hash) - .await?; - current_start = batch_end + 1; - } - Ok(None) => { - tracing::warn!( - "Missing header at height {} for batch retry, continuing to next batch", - batch_end - ); - current_start = batch_end + 1; - } - Err(e) => { - tracing::error!( - "Error retrieving header at height {}: {:?}, continuing to next batch", - batch_end, - e - ); - current_start = batch_end + 1; - } - } - } - - // Update retry count for the original range - self.filter_retry_counts.insert((start, end), retry_count + 1); - retried_count += 1; - } - } - Ok(None) => { - tracing::error!( - "Cannot retry filter range {}-{}: header not found at height {}", - start, - end, - end - ); - } - Err(e) => { - tracing::error!("Failed to get header at height {} for retry: {}", end, e); - } - } - } - - if retried_count > 0 { - tracing::info!("📡 Retried {} filter ranges", retried_count); - } - - Ok(retried_count) - } - - /// Check and retry missing filters (main entry point for monitoring loop). - /// - /// Logs diagnostic information about missing ranges before retrying. - pub async fn check_and_retry_missing_filters( - &mut self, - network: &mut N, - storage: &S, - ) -> SyncResult<()> { - let missing_ranges = self.find_missing_ranges(); - let total_missing = self.get_total_missing_filters(); - - if total_missing > 0 { - tracing::info!( - "📊 Filter gap check: {} missing ranges covering {} filters", - missing_ranges.len(), - total_missing - ); - - // Show first few missing ranges for debugging - for (i, (start, end)) in missing_ranges.iter().enumerate() { - if i >= 5 { - tracing::info!(" ... and {} more missing ranges", missing_ranges.len() - 5); - break; - } - tracing::info!(" Missing range: {}-{} ({} filters)", start, end, end - start + 1); - } - - let retried = self.retry_missing_filters(network, storage).await?; - if retried > 0 { - tracing::info!("✅ Initiated retry for {} filter ranges", retried); - } - } - - Ok(()) - } - - /// Merge adjacent ranges for efficiency, but respect the maximum filter request size. - /// - /// Sorts ranges, merges adjacent ones if they don't exceed MAX_FILTER_REQUEST_SIZE, - /// and splits any ranges that exceed the limit. - fn merge_adjacent_ranges(ranges: &mut Vec<(u32, u32)>) { - if ranges.is_empty() { - return; - } - - ranges.sort_by_key(|(start, _)| *start); - - let mut merged = Vec::new(); - let mut current = ranges[0]; - - for &(start, end) in ranges.iter().skip(1) { - let potential_merged_size = end.saturating_sub(current.0) + 1; - - if start <= current.1 + 1 && potential_merged_size <= MAX_FILTER_REQUEST_SIZE { - // Merge ranges only if the result doesn't exceed the limit - current.1 = current.1.max(end); - } else { - // Non-adjacent or would exceed limit, push current and start new - merged.push(current); - current = (start, end); - } - } - - merged.push(current); - - // Final pass: split any ranges that still exceed the limit - let mut final_ranges = Vec::new(); - for (start, end) in merged { - let range_size = end.saturating_sub(start) + 1; - if range_size <= MAX_FILTER_REQUEST_SIZE { - final_ranges.push((start, end)); - } else { - // Split large range into smaller chunks - let mut chunk_start = start; - while chunk_start <= end { - let chunk_end = (chunk_start + MAX_FILTER_REQUEST_SIZE - 1).min(end); - final_ranges.push((chunk_start, chunk_end)); - chunk_start = chunk_end + 1; - } - } - } - - *ranges = final_ranges; - } -} diff --git a/dash-spv/src/sync/filters/headers.rs b/dash-spv/src/sync/filters/headers.rs index 42992873c..cbe213967 100644 --- a/dash-spv/src/sync/filters/headers.rs +++ b/dash-spv/src/sync/filters/headers.rs @@ -151,337 +151,9 @@ impl { - tracing::debug!( - "Received CFHeaders batch: batch_start={} (hash={}), msg_prev_header={} at {}, expected_start={} (hash={}), effective_prev_height={}, stop={}, count={}", - batch_start_height, - batch_hash, - cf_headers.previous_filter_header, - prev_height, - self.current_sync_height, - expected_hash, - effective_prev_height, - stop_height, - cf_headers.filter_hashes.len() - ); - } - (None, Some(expected_hash)) => { - tracing::debug!( - "Received CFHeaders batch: batch_start={} (hash=), msg_prev_header={} at {}, expected_start={} (hash={}), effective_prev_height={}, stop={}, count={}", - batch_start_height, - cf_headers.previous_filter_header, - prev_height, - self.current_sync_height, - expected_hash, - effective_prev_height, - stop_height, - cf_headers.filter_hashes.len() - ); - } - (Some(batch_hash), None) => { - tracing::debug!( - "Received CFHeaders batch: batch_start={} (hash={}), msg_prev_header={} at {}, expected_start={} (hash=), effective_prev_height={}, stop={}, count={}", - batch_start_height, - batch_hash, - cf_headers.previous_filter_header, - prev_height, - self.current_sync_height, - effective_prev_height, - stop_height, - cf_headers.filter_hashes.len() - ); - } - (None, None) => { - tracing::debug!( - "Received CFHeaders batch: batch_start={} (hash=), msg_prev_header={} at {}, expected_start={} (hash=), effective_prev_height={}, stop={}, count={}", - batch_start_height, - cf_headers.previous_filter_header, - prev_height, - self.current_sync_height, - effective_prev_height, - stop_height, - cf_headers.filter_hashes.len() - ); - } - } - - // Check if this is the expected batch or if there's overlap - if batch_start_height < self.current_sync_height { - // Special-case benign overlaps around checkpoint boundaries; log at debug level - let benign_checkpoint_overlap = self.sync_base_height > 0 - && ((batch_start_height + 1 == self.sync_base_height - && self.current_sync_height == self.sync_base_height) - || (batch_start_height == self.sync_base_height - && self.current_sync_height == self.sync_base_height + 1)); - - // Try to include the peer address for diagnostics - let peer_addr = network.get_last_message_peer_addr().await; - if benign_checkpoint_overlap { - match peer_addr { - Some(addr) => { - tracing::debug!( - "📋 Benign checkpoint overlap from {}: expected start={}, received start={}", - addr, - self.current_sync_height, - batch_start_height - ); - } - None => { - tracing::debug!( - "📋 Benign checkpoint overlap: expected start={}, received start={}", - self.current_sync_height, - batch_start_height - ); - } - } - } else { - match peer_addr { - Some(addr) => { - tracing::warn!( - "📋 Received overlapping filter headers from {}: expected start={}, received start={} (likely from recovery/retry)", - addr, - self.current_sync_height, - batch_start_height - ); - } - None => { - tracing::warn!( - "📋 Received overlapping filter headers: expected start={}, received start={} (likely from recovery/retry)", - self.current_sync_height, - batch_start_height - ); - } - } - } - - // Handle overlapping headers using the helper method - let (new_headers_stored, new_current_height) = self - .handle_overlapping_headers(&cf_headers, self.current_sync_height, storage) - .await?; - self.current_sync_height = new_current_height; - - // Only record progress if we actually stored new headers - if new_headers_stored > 0 { - self.last_sync_progress = std::time::Instant::now(); - } - } else if batch_start_height > self.current_sync_height { - // Gap in the sequence - this shouldn't happen in normal operation - tracing::error!( - "❌ Gap detected in filter header sequence: expected start={}, received start={} (gap of {} headers)", - self.current_sync_height, - batch_start_height, - batch_start_height - self.current_sync_height - ); - return Err(SyncError::Validation(format!( - "Gap in filter header sequence: expected {}, got {}", - self.current_sync_height, batch_start_height - ))); - } else { - // This is the expected batch - process it - match self.verify_filter_header_chain(&cf_headers, batch_start_height, storage).await { - Ok(true) => { - tracing::debug!( - "✅ Filter header chain verification successful for batch {}-{}", - batch_start_height, - stop_height - ); - - // Store the verified filter headers - self.store_filter_headers(cf_headers.clone(), storage).await?; - - // Update current height and record progress - self.current_sync_height = stop_height + 1; - self.last_sync_progress = std::time::Instant::now(); - - // Check if we've reached the header tip - if stop_height >= header_tip_height { - // Perform stability check before declaring completion - if let Ok(is_stable) = self.check_filter_header_stability(storage).await { - if is_stable { - tracing::info!( - "🎯 Filter header sync complete at height {} (stability confirmed)", - stop_height - ); - self.syncing_filter_headers = false; - return Ok(false); - } else { - tracing::debug!( - "Filter header sync reached tip at height {} but stability check failed, continuing sync", - stop_height - ); - } - } else { - tracing::debug!( - "Filter header sync reached tip at height {} but stability check errored, continuing sync", - stop_height - ); - } - } - - // Check if our next sync height would exceed the header tip - if self.current_sync_height > header_tip_height { - tracing::info!( - "Filter header sync complete - current sync height {} exceeds header tip {}", - self.current_sync_height, - header_tip_height - ); - self.syncing_filter_headers = false; - return Ok(false); - } - - // Request next batch - let next_batch_end_height = - (self.current_sync_height + FILTER_BATCH_SIZE - 1).min(header_tip_height); - tracing::debug!( - "Calculated next batch end height: {} (current: {}, tip: {})", - next_batch_end_height, - self.current_sync_height, - header_tip_height - ); - - let stop_hash = if next_batch_end_height < header_tip_height { - // Try to get the header at the calculated height - match storage.get_header(next_batch_end_height).await { - Ok(Some(header)) => header.block_hash(), - Ok(None) => { - tracing::warn!( - "Header not found at blockchain height {}, scanning backwards to find actual available height", - next_batch_end_height - ); - - let min_height = self.current_sync_height; // Don't go below where we are - match self - .find_available_header_at_or_before( - next_batch_end_height.saturating_sub(1), - min_height, - storage, - ) - .await - { - Some((hash, height)) => { - if height < self.current_sync_height { - tracing::warn!( - "Found header at height {} which is less than current sync height {}. This means we already have filter headers up to {}. Marking sync as complete.", - height, - self.current_sync_height, - self.current_sync_height - 1 - ); - self.syncing_filter_headers = false; - return Ok(false); - } - hash - } - None => { - tracing::error!( - "No available headers found between {} and {} - storage appears to have gaps", - min_height, - next_batch_end_height - ); - tracing::error!( - "This indicates a serious storage inconsistency. Stopping filter header sync." - ); - self.syncing_filter_headers = false; - return Err(SyncError::Storage(format!( - "No available headers found between {} and {} while selecting next batch stop hash", - min_height, - next_batch_end_height - ))); - } - } - } - Err(e) => { - return Err(SyncError::Storage(format!( - "Failed to get next batch stop header at height {}: {}", - next_batch_end_height, e - ))); - } - } - } else { - // Special handling for chain tip: if we can't find the exact tip header, - // try the previous header as we might be at the actual chain tip - match storage.get_header(header_tip_height).await { - Ok(Some(header)) => header.block_hash(), - Ok(None) if header_tip_height > 0 => { - tracing::debug!( - "Tip header not found at blockchain height {}, trying previous header", - header_tip_height - ); - // Try previous header when at chain tip - match storage.get_header(header_tip_height - 1).await { - Ok(Some(header)) => header.block_hash(), - _ => { - tracing::warn!( - "⚠️ No header found at tip or tip-1 during CFHeaders handling" - ); - return Err(SyncError::Validation( - "No header found at tip or tip-1".to_string(), - )); - } - } - } - _ => { - return Err(SyncError::Validation( - "No header found at computed end height".to_string(), - )); - } - } - }; - - self.request_filter_headers(network, self.current_sync_height, stop_hash) - .await?; - } - Ok(false) => { - tracing::warn!( - "⚠️ Filter header chain verification failed for batch {}-{}", - batch_start_height, - stop_height - ); - return Err(SyncError::Validation( - "Filter header chain verification failed".to_string(), - )); - } - Err(e) => { - tracing::error!("❌ Filter header chain verification failed: {}", e); - return Err(e); - } - } - } - - Ok(true) + self.handle_filter_headers(cf_headers, storage, network).await } + pub async fn start_sync_headers( &mut self, network: &mut N, @@ -700,8 +372,8 @@ impl SyncResult<()> { // Send initial batch up to max_concurrent_cfheader_requests let initial_send_count = @@ -904,8 +576,8 @@ impl header_tip) } - /// Clear flow control state. - fn clear_cfheader_flow_control_state(&mut self) { + /// Clear sync state. + fn clear_filter_header_sync_state(&mut self) { self.pending_cfheader_requests.clear(); self.active_cfheader_requests.clear(); self.cfheader_retry_counts.clear(); @@ -1308,41 +980,4 @@ impl SyncResult { - let current_filter_tip = storage - .get_filter_tip_height() - .await - .map_err(|e| SyncError::Storage(format!("Failed to get filter tip height: {}", e)))?; - - let now = std::time::Instant::now(); - - // Check if the tip height has changed since last check - if self.last_filter_tip_height != current_filter_tip { - // Tip height changed, reset stability timer - self.last_filter_tip_height = current_filter_tip; - self.last_stability_check = now; - tracing::debug!( - "Filter tip height changed to {:?}, resetting stability timer", - current_filter_tip - ); - return Ok(false); - } - - // Check if enough time has passed since last change - const STABILITY_DURATION: std::time::Duration = std::time::Duration::from_secs(3); - if now.duration_since(self.last_stability_check) >= STABILITY_DURATION { - tracing::debug!( - "Filter header sync stability confirmed (tip height {:?} stable for 3+ seconds)", - current_filter_tip - ); - return Ok(true); - } - - tracing::debug!( - "Filter header sync stability check: waiting for tip height {:?} to stabilize", - current_filter_tip - ); - Ok(false) - } } diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index e4f95447f..aeb726e6b 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -1,7 +1,7 @@ //! Filter synchronization manager - main coordinator. //! //! This module contains the FilterSyncManager struct and high-level coordination logic -//! that delegates to specialized sub-modules for headers, downloads, matching, gaps, etc. +//! that delegates to specialized sub-modules for headers, downloads, matching, etc. use dashcore::{hash_types::FilterHeader, network::message_filter::CFHeaders, BlockHash}; use dashcore_hashes::{sha256d, Hash}; @@ -28,7 +28,7 @@ use super::types::*; /// Filter synchronization involves: /// - Downloading thousands of filter headers and filters /// - Complex flow control with parallel requests -/// - Retry logic and gap detection +/// - Retry logic /// - Storage operations for persistence /// /// Generic design enables: @@ -50,10 +50,6 @@ pub struct FilterSyncManager { pub(super) sync_base_height: u32, /// Last time sync progress was made (for timeout detection) pub(super) last_sync_progress: std::time::Instant, - /// Last time filter header tip height was checked for stability - pub(super) last_stability_check: std::time::Instant, - /// Filter tip height from last stability check - pub(super) last_filter_tip_height: Option, /// Whether filter sync is currently in progress pub(super) syncing_filters: bool, /// Queue of blocks that have been requested and are waiting for response @@ -62,8 +58,6 @@ pub struct FilterSyncManager { pub(super) downloading_blocks: HashMap, /// Blocks requested by the filter processing thread pub(super) processing_thread_requests: std::sync::Arc>>, - /// Track requested filter ranges: (start_height, end_height) -> request_time - pub(super) requested_filter_ranges: HashMap<(u32, u32), std::time::Instant>, /// Track individual filter heights that have been received (shared with stats) pub(super) received_filter_heights: SharedFilterHeights, /// Maximum retries for a filter range @@ -74,22 +68,10 @@ pub struct FilterSyncManager { pub(super) pending_filter_requests: VecDeque, /// Currently active filter requests (limited by MAX_CONCURRENT_FILTER_REQUESTS) pub(super) active_filter_requests: HashMap<(u32, u32), ActiveRequest>, - /// Whether flow control is enabled - pub(super) flow_control_enabled: bool, - /// Last time we detected a gap and attempted restart - pub(super) last_gap_restart_attempt: Option, - /// Minimum time between gap restart attempts (to prevent spam) - pub(super) gap_restart_cooldown: std::time::Duration, - /// Number of consecutive gap restart failures - pub(super) gap_restart_failure_count: u32, - /// Maximum gap restart attempts before giving up - pub(super) max_gap_restart_attempts: u32, /// Queue of pending CFHeaders requests pub(super) pending_cfheader_requests: VecDeque, /// Currently active CFHeaders requests: (start_height, stop_height) -> ActiveCFHeaderRequest pub(super) active_cfheader_requests: HashMap, - /// Whether CFHeaders flow control is enabled - pub(super) cfheaders_flow_control_enabled: bool, /// Retry counts per CFHeaders range: start_height -> retry_count pub(super) cfheader_retry_counts: HashMap, /// Maximum retries for CFHeaders @@ -115,31 +97,20 @@ impl super::manager::FilterSyncManager { - pub async fn check_filters_for_matches( - &self, - _storage: &S, - start_height: u32, - end_height: u32, - ) -> SyncResult> { - tracing::info!( - "Checking filters for matches from height {} to {}", - start_height, - end_height - ); - - // TODO: This will be integrated with wallet's check_compact_filter - // For now, return empty matches - Ok(Vec::new()) - } - pub async fn check_filter_for_matches< W: key_wallet_manager::wallet_interface::WalletInterface, >( @@ -286,168 +265,4 @@ impl, - _processing_thread_requests: std::sync::Arc< - tokio::sync::Mutex>, - >, - stats: std::sync::Arc>, - ) -> FilterNotificationSender { - let (filter_tx, mut filter_rx) = - mpsc::unbounded_channel::(); - - tokio::spawn(async move { - tracing::info!("🔄 Filter processing thread started (wallet integration pending)"); - - loop { - tokio::select! { - // Handle CFilter messages - Some(cfilter) = filter_rx.recv() => { - // TODO: Process filter with wallet - tracing::debug!("Received CFilter for block {} (wallet integration pending)", cfilter.block_hash); - // Update stats - Self::update_filter_received(&stats).await; - } - - // Exit when channel is closed - else => { - tracing::info!("🔄 Filter processing thread stopped"); - break; - } - } - } - }); - - filter_tx - } - - /* TODO: Re-implement with wallet integration - async fn process_filter_notification( - cfilter: dashcore::network::message_filter::CFilter, - network_message_sender: &mpsc::Sender, - processing_thread_requests: &std::sync::Arc< - tokio::sync::Mutex>, - >, - stats: &std::sync::Arc>, - ) -> SyncResult<()> { - // Update filter reception tracking - Self::update_filter_received(stats).await; - - if watch_items.is_empty() { - return Ok(()); - } - - // Convert watch items to scripts for filter checking - let mut scripts = Vec::with_capacity(watch_items.len()); - for item in watch_items { - match item { - crate::types::WatchItem::Address { - address, - .. - } => { - scripts.push(address.script_pubkey()); - } - crate::types::WatchItem::Script(script) => { - scripts.push(script.clone()); - } - crate::types::WatchItem::Outpoint(_) => { - // Skip outpoints for now - } - } - } - - if scripts.is_empty() { - return Ok(()); - } - - // Check if the filter matches any of our scripts - let matches = Self::check_filter_matches(&cfilter.filter, &cfilter.block_hash, &scripts)?; - - if matches { - tracing::info!( - "🎯 Filter match found in processing thread for block {}", - cfilter.block_hash - ); - - // Update filter match statistics - { - let mut stats_lock = stats.write().await; - stats_lock.filters_matched += 1; - } - - // Register this request in the processing thread tracking - { - let mut requests = processing_thread_requests.lock().await; - requests.insert(cfilter.block_hash); - tracing::debug!( - "Registered block {} in processing thread requests", - cfilter.block_hash - ); - } - - // Request the full block download - let inv = dashcore::network::message_blockdata::Inventory::Block(cfilter.block_hash); - let getdata = dashcore::network::message::NetworkMessage::GetData(vec![inv]); - - if let Err(e) = network_message_sender.send(getdata).await { - tracing::error!("Failed to request block download for match: {}", e); - // Remove from tracking if request failed - { - let mut requests = processing_thread_requests.lock().await; - requests.remove(&cfilter.block_hash); - } - } else { - tracing::info!( - "📦 Requested block download for filter match: {}", - cfilter.block_hash - ); - } - } - - Ok(()) - } - */ - - /* TODO: Re-implement with wallet integration - fn check_filter_matches( - filter_data: &[u8], - block_hash: &BlockHash, - scripts: &[ScriptBuf], - ) -> SyncResult { - if scripts.is_empty() || filter_data.is_empty() { - return Ok(false); - } - - // Create a BlockFilterReader with the block hash for proper key derivation - let filter_reader = BlockFilterReader::new(block_hash); - - // Convert scripts to byte slices for matching - let mut script_bytes = Vec::with_capacity(scripts.len()); - for script in scripts { - script_bytes.push(script.as_bytes()); - } - - // Use the BIP158 filter to check if any scripts match - let mut filter_slice = filter_data; - match filter_reader.match_any(&mut filter_slice, script_bytes.into_iter()) { - Ok(matches) => { - if matches { - tracing::info!( - "BIP158 filter match found! Block {} contains watched scripts", - block_hash - ); - } - Ok(matches) - } - Err(Bip158Error::Io(e)) => { - Err(SyncError::Storage(format!("BIP158 filter IO error: {}", e))) - } - Err(Bip158Error::UtxoMissing(outpoint)) => { - Err(SyncError::Validation(format!("BIP158 filter UTXO missing: {}", outpoint))) - } - Err(_) => Err(SyncError::Validation("BIP158 filter error".to_string())), - } - } - */ } diff --git a/dash-spv/src/sync/filters/mod.rs b/dash-spv/src/sync/filters/mod.rs index 626e12326..5bad427dd 100644 --- a/dash-spv/src/sync/filters/mod.rs +++ b/dash-spv/src/sync/filters/mod.rs @@ -9,7 +9,6 @@ //! - `headers` - CFHeaders synchronization //! - `download` - CFilter download logic //! - `matching` - Filter matching against wallet -//! - `gaps` - Gap detection and recovery //! - `retry` - Retry and timeout logic //! - `stats` - Statistics and progress tracking //! - `requests` - Request queue management @@ -20,10 +19,8 @@ //! 1. pending_requests //! 2. active_requests //! 3. received_heights -//! 4. gap_tracker pub mod download; -pub mod gaps; pub mod headers; pub mod manager; pub mod matching; diff --git a/dash-spv/src/sync/filters/requests.rs b/dash-spv/src/sync/filters/requests.rs index 8d2d50ab2..6115576b9 100644 --- a/dash-spv/src/sync/filters/requests.rs +++ b/dash-spv/src/sync/filters/requests.rs @@ -130,7 +130,7 @@ impl SyncResult<()> { - if !self.flow_control_enabled { - return Ok(()); - } - let available_slots = MAX_CONCURRENT_FILTER_REQUESTS.saturating_sub(self.active_filter_requests.len()); let mut sent_count = 0; diff --git a/dash-spv/src/sync/filters/retry.rs b/dash-spv/src/sync/filters/retry.rs index 3f02752a9..116337501 100644 --- a/dash-spv/src/sync/filters/retry.rs +++ b/dash-spv/src/sync/filters/retry.rs @@ -156,14 +156,13 @@ impl SyncResult<()> { - if !self.cfheaders_flow_control_enabled || !self.syncing_filter_headers { + if !self.syncing_filter_headers { return Ok(()); } @@ -234,19 +233,11 @@ impl SyncResult<()> { - if !self.flow_control_enabled { - // Fall back to original timeout checking - return self.check_and_retry_missing_filters(network, storage).await; - } - let now = std::time::Instant::now(); let timeout_duration = std::time::Duration::from_secs(REQUEST_TIMEOUT_SECONDS); @@ -346,36 +337,4 @@ impl Vec<(u32, u32)> { - let now = std::time::Instant::now(); - let mut timed_out = Vec::new(); - - let heights = match self.received_filter_heights.try_lock() { - Ok(heights) => heights.clone(), - Err(_) => return timed_out, - }; - - for ((start, end), request_time) in &self.requested_filter_ranges { - if now.duration_since(*request_time) > timeout_duration { - // Check if this range is incomplete - let mut is_incomplete = false; - for height in *start..=*end { - if !heights.contains(&height) { - is_incomplete = true; - break; - } - } - - if is_incomplete { - timed_out.push((*start, *end)); - } - } - } - - timed_out - } } diff --git a/dash-spv/src/sync/filters/stats.rs b/dash-spv/src/sync/filters/stats.rs index b9cbad370..1ea30cd60 100644 --- a/dash-spv/src/sync/filters/stats.rs +++ b/dash-spv/src/sync/filters/stats.rs @@ -3,21 +3,16 @@ use super::types::*; use crate::network::NetworkManager; use crate::storage::StorageManager; -use dashcore::BlockHash; impl super::manager::FilterSyncManager { - /// Get flow control status (pending count, active count, enabled). - pub fn get_flow_control_status(&self) -> (usize, usize, bool) { - ( - self.pending_filter_requests.len(), - self.active_filter_requests.len(), - self.flow_control_enabled, - ) + /// Get state (pending count, active count). + pub fn get_filter_sync_state(&self) -> (usize, usize) { + (self.pending_filter_requests.len(), self.active_filter_requests.len()) } - /// Get number of available request slots for flow control. + /// Get number of available request slots. pub fn get_available_request_slots(&self) -> usize { MAX_CONCURRENT_FILTER_REQUESTS.saturating_sub(self.active_filter_requests.len()) } @@ -29,205 +24,4 @@ impl 0, } } - - /// Start tracking filter sync progress. - /// - /// If a sync session is already in progress, adds to the existing count. - /// Otherwise, starts a fresh tracking session. - pub async fn start_filter_sync_tracking( - stats: &std::sync::Arc>, - total_filters_requested: u64, - ) { - let mut stats_lock = stats.write().await; - - // If we're starting a new sync session while one is already in progress, - // add to the existing count instead of resetting - if stats_lock.filter_sync_start_time.is_some() { - // Accumulate the new request count - stats_lock.filters_requested += total_filters_requested; - tracing::info!( - "📊 Added {} filters to existing sync tracking (total: {} filters requested)", - total_filters_requested, - stats_lock.filters_requested - ); - } else { - // Fresh start - reset everything - stats_lock.filters_requested = total_filters_requested; - stats_lock.filters_received = 0; - stats_lock.filter_sync_start_time = Some(std::time::Instant::now()); - stats_lock.last_filter_received_time = None; - // Clear the received heights tracking for a fresh start - let received_filter_heights = stats_lock.received_filter_heights.clone(); - drop(stats_lock); // Release the RwLock before awaiting the mutex - let mut heights = received_filter_heights.lock().await; - heights.clear(); - tracing::info!( - "📊 Started new filter sync tracking: {} filters requested", - total_filters_requested - ); - } - } - - /// Complete filter sync tracking (marks the sync session as complete). - pub async fn complete_filter_sync_tracking( - stats: &std::sync::Arc>, - ) { - let mut stats_lock = stats.write().await; - stats_lock.filter_sync_start_time = None; - tracing::info!("📊 Completed filter sync tracking"); - } - - /// Update filter reception tracking. - pub async fn update_filter_received( - stats: &std::sync::Arc>, - ) { - let mut stats_lock = stats.write().await; - stats_lock.filters_received += 1; - stats_lock.last_filter_received_time = Some(std::time::Instant::now()); - } - - /// Record filter received at specific height (used by processing thread). - pub async fn record_filter_received_at_height( - stats: &std::sync::Arc>, - storage: &S, - block_hash: &BlockHash, - ) { - // Look up height for the block hash - if let Ok(Some(height)) = storage.get_header_height_by_hash(block_hash).await { - // Increment the received counter so high-level progress reflects the update - Self::update_filter_received(stats).await; - - // Get the shared filter heights arc from stats - let stats_lock = stats.read().await; - let received_filter_heights = stats_lock.received_filter_heights.clone(); - drop(stats_lock); // Release the stats lock before acquiring the mutex - - // Now lock the heights and insert - let mut heights = received_filter_heights.lock().await; - heights.insert(height); - tracing::trace!( - "📊 Recorded filter received at height {} for block {}", - height, - block_hash - ); - } else { - tracing::warn!("Could not find height for filter block hash {}", block_hash); - } - } - - /// Get filter sync progress as percentage. - pub async fn get_filter_sync_progress( - stats: &std::sync::Arc>, - ) -> f64 { - let stats_lock = stats.read().await; - if stats_lock.filters_requested == 0 { - return 0.0; - } - (stats_lock.filters_received as f64 / stats_lock.filters_requested as f64) * 100.0 - } - - /// Check if filter sync has timed out (no filters received for 30+ seconds). - pub async fn check_filter_sync_timeout( - stats: &std::sync::Arc>, - ) -> bool { - let stats_lock = stats.read().await; - if let Some(last_received) = stats_lock.last_filter_received_time { - last_received.elapsed() > std::time::Duration::from_secs(30) - } else if let Some(sync_start) = stats_lock.filter_sync_start_time { - // No filters received yet, check if we've been waiting too long - sync_start.elapsed() > std::time::Duration::from_secs(30) - } else { - false - } - } - - /// Get filter sync status information. - /// - /// Returns: (filters_requested, filters_received, progress_percentage, is_timeout) - pub async fn get_filter_sync_status( - stats: &std::sync::Arc>, - ) -> (u64, u64, f64, bool) { - let stats_lock = stats.read().await; - let progress = if stats_lock.filters_requested == 0 { - 0.0 - } else { - (stats_lock.filters_received as f64 / stats_lock.filters_requested as f64) * 100.0 - }; - - let timeout = if let Some(last_received) = stats_lock.last_filter_received_time { - last_received.elapsed() > std::time::Duration::from_secs(30) - } else if let Some(sync_start) = stats_lock.filter_sync_start_time { - sync_start.elapsed() > std::time::Duration::from_secs(30) - } else { - false - }; - - (stats_lock.filters_requested, stats_lock.filters_received, progress, timeout) - } - - /// Get enhanced filter sync status with gap information. - /// - /// This function provides comprehensive filter sync status by combining: - /// 1. Basic progress tracking (filters_received vs filters_requested) - /// 2. Gap analysis of active filter requests - /// 3. Correction logic for tracking inconsistencies - /// - /// The function addresses a bug where completion could be incorrectly reported - /// when active request tracking (requested_filter_ranges) was empty but - /// basic progress indicated incomplete sync. This could happen when filter - /// range requests were marked complete but individual filters within those - /// ranges were never actually received. - /// - /// Returns: (filters_requested, filters_received, basic_progress, timeout, total_missing, actual_coverage, missing_ranges) - pub async fn get_filter_sync_status_with_gaps( - stats: &std::sync::Arc>, - filter_sync: &super::manager::FilterSyncManager, - ) -> (u64, u64, f64, bool, u32, f64, Vec<(u32, u32)>) { - let stats_lock = stats.read().await; - let basic_progress = if stats_lock.filters_requested == 0 { - 0.0 - } else { - (stats_lock.filters_received as f64 / stats_lock.filters_requested as f64) * 100.0 - }; - - let timeout = if let Some(last_received) = stats_lock.last_filter_received_time { - last_received.elapsed() > std::time::Duration::from_secs(30) - } else if let Some(sync_start) = stats_lock.filter_sync_start_time { - sync_start.elapsed() > std::time::Duration::from_secs(30) - } else { - false - }; - - // Get gap information from active requests - let missing_ranges = filter_sync.find_missing_ranges(); - let total_missing = filter_sync.get_total_missing_filters(); - let actual_coverage = filter_sync.get_actual_coverage_percentage(); - - // If active request tracking shows no gaps but basic progress indicates incomplete sync, - // we may have a tracking inconsistency. In this case, trust the basic progress calculation. - let corrected_total_missing = if total_missing == 0 - && stats_lock.filters_received < stats_lock.filters_requested - { - // Gap detection failed, but basic stats show incomplete sync - tracing::debug!( - "Gap detection shows complete ({}), but basic progress shows {}/{} - treating as incomplete", - total_missing, - stats_lock.filters_received, - stats_lock.filters_requested - ); - (stats_lock.filters_requested - stats_lock.filters_received) as u32 - } else { - total_missing - }; - - ( - stats_lock.filters_requested, - stats_lock.filters_received, - basic_progress, - timeout, - corrected_total_missing, - actual_coverage, - missing_ranges, - ) - } } diff --git a/dash-spv/src/sync/message_handlers.rs b/dash-spv/src/sync/message_handlers.rs index a9763e4e2..7c40aa0da 100644 --- a/dash-spv/src/sync/message_handlers.rs +++ b/dash-spv/src/sync/message_handlers.rs @@ -623,6 +623,26 @@ impl< drop(wallet); + { + let mut stats_lock = self.stats.write().await; + stats_lock.filters_received += 1; + stats_lock.last_filter_received_time = Some(std::time::Instant::now()); + } + + // Get the shared filter heights arc from stats + let stats_lock = self.stats.read().await; + let received_filter_heights = stats_lock.received_filter_heights.clone(); + drop(stats_lock); // Release the stats lock before acquiring the mutex + + // Now lock the heights and insert + let mut heights = received_filter_heights.lock().await; + heights.insert(height); + tracing::trace!( + "📊 Recorded filter received at height {} for block {}", + height, + cfilter.block_hash + ); + if matches { // Update filter match statistics { @@ -697,7 +717,7 @@ impl< .. } = &self.current_phase { - // For flow control, we need to check: + // We need to check: // 1. All expected filters have been received (completed_heights matches total_filters) // 2. No more active or pending requests let has_pending = self.filter_sync.pending_download_count() > 0 diff --git a/dash-spv/src/sync/phase_execution.rs b/dash-spv/src/sync/phase_execution.rs index 4e76d0ea4..b2f8ff155 100644 --- a/dash-spv/src/sync/phase_execution.rs +++ b/dash-spv/src/sync/phase_execution.rs @@ -112,14 +112,8 @@ impl< self.filter_sync.set_sync_base_height(sync_base_height); } - // Use flow control if enabled, otherwise use single-request mode - let sync_started = if self.config.enable_cfheaders_flow_control { - tracing::info!("Using CFHeaders flow control for parallel sync"); - self.filter_sync.start_sync_headers_with_flow_control(network, storage).await? - } else { - tracing::info!("Using single-request CFHeaders sync (flow control disabled)"); - self.filter_sync.start_sync_headers(network, storage).await? - }; + let sync_started = + self.filter_sync.start_sync_filter_headers(network, storage).await?; if !sync_started { // No peers support compact filters or already up to date @@ -173,12 +167,7 @@ impl< // Use the filter sync manager to download filters self.filter_sync - .sync_filters_with_flow_control( - network, - storage, - Some(start_height), - Some(count), - ) + .sync_filters(network, storage, Some(start_height), Some(count)) .await?; } else { // No filter headers available, skip to next phase @@ -324,11 +313,7 @@ impl< SyncPhase::DownloadingCFHeaders { .. } => { - if self.config.enable_cfheaders_flow_control { - self.filter_sync.check_cfheader_request_timeouts(network, storage).await?; - } else { - self.filter_sync.check_sync_timeout(storage, network).await?; - } + self.filter_sync.check_cfheader_request_timeouts(network, storage).await?; } SyncPhase::DownloadingMnList { .. @@ -480,11 +465,7 @@ impl< SyncPhase::DownloadingCFHeaders { .. } => { - if self.config.enable_cfheaders_flow_control { - self.filter_sync.check_cfheader_request_timeouts(network, storage).await?; - } else { - self.filter_sync.check_sync_timeout(storage, network).await?; - } + self.filter_sync.check_cfheader_request_timeouts(network, storage).await?; } _ => { // For other phases, we'll need phase-specific recovery diff --git a/dash-spv/tests/cfheader_gap_test.rs b/dash-spv/tests/cfheader_gap_test.rs deleted file mode 100644 index 4f45aca65..000000000 --- a/dash-spv/tests/cfheader_gap_test.rs +++ /dev/null @@ -1,256 +0,0 @@ -//! Tests for CFHeader gap detection and auto-restart functionality. -//! -//! NOTE: This test file is currently ignored due to incomplete mock NetworkManager implementation. -//! TODO: Re-enable once NetworkManager trait methods are fully implemented. - -//! Tests for CFHeader gap detection and auto-restart functionality. - -use std::collections::HashSet; -use std::sync::Arc; -use tokio::sync::Mutex; - -use dash_spv::{ - client::ClientConfig, - network::PeerNetworkManager, - storage::{MemoryStorageManager, StorageManager}, - sync::filters::FilterSyncManager, -}; -use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Network}; -use dashcore_hashes::Hash; - -/// Create a mock block header -fn create_mock_header(height: u32) -> BlockHeader { - BlockHeader { - version: dashcore::block::Version::ONE, - prev_blockhash: BlockHash::all_zeros(), - merkle_root: dashcore::hash_types::TxMerkleNode::all_zeros(), - time: 1234567890 + height, - bits: dashcore::pow::CompactTarget::from_consensus(0x1d00ffff), - nonce: height, - } -} - -/// Create a mock filter header -fn create_mock_filter_header() -> FilterHeader { - FilterHeader::all_zeros() -} - -#[tokio::test] -#[ignore = "mock NetworkManager implementation incomplete"] -async fn test_cfheader_gap_detection_no_gap() { - let config = ClientConfig::new(Network::Dash); - let received_heights = Arc::new(Mutex::new(HashSet::new())); - let filter_sync: FilterSyncManager = - FilterSyncManager::::new( - &config, - received_heights, - ); - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Store 100 block headers and 100 filter headers (no gap) - let mut headers = Vec::new(); - let mut filter_headers = Vec::new(); - - for i in 1..=100 { - headers.push(create_mock_header(i)); - filter_headers.push(create_mock_filter_header()); - } - - storage.store_headers(&headers).await.unwrap(); - storage.store_filter_headers(&filter_headers).await.unwrap(); - - // Check gap detection - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(!has_gap, "Should not detect gap when heights are equal"); - assert_eq!(block_height, 99); // 0-indexed, so 100 headers = height 99 - assert_eq!(filter_height, 99); - assert_eq!(gap_size, 0); -} - -#[tokio::test] -#[ignore = "mock NetworkManager implementation incomplete"] -async fn test_cfheader_gap_detection_with_gap() { - let config = ClientConfig::new(Network::Dash); - let received_heights = Arc::new(Mutex::new(HashSet::new())); - let filter_sync: FilterSyncManager = - FilterSyncManager::::new( - &config, - received_heights, - ); - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Store 200 block headers but only 150 filter headers (gap of 50) - let mut headers = Vec::new(); - let mut filter_headers = Vec::new(); - - for i in 1..=200 { - headers.push(create_mock_header(i)); - } - - for _i in 1..=150 { - filter_headers.push(create_mock_filter_header()); - } - - storage.store_headers(&headers).await.unwrap(); - storage.store_filter_headers(&filter_headers).await.unwrap(); - - // Check gap detection - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(has_gap, "Should detect gap when block headers > filter headers"); - assert_eq!(block_height, 199); // 0-indexed, so 200 headers = height 199 - assert_eq!(filter_height, 149); // 0-indexed, so 150 headers = height 149 - assert_eq!(gap_size, 50); -} - -#[tokio::test] -#[ignore = "mock NetworkManager implementation incomplete"] -async fn test_cfheader_gap_detection_filter_ahead() { - let config = ClientConfig::new(Network::Dash); - let received_heights = Arc::new(Mutex::new(HashSet::new())); - let filter_sync: FilterSyncManager = - FilterSyncManager::::new( - &config, - received_heights, - ); - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Store 100 block headers but 120 filter headers (filter ahead - no gap) - let mut headers = Vec::new(); - let mut filter_headers = Vec::new(); - - for i in 1..=100 { - headers.push(create_mock_header(i)); - } - - for _i in 1..=120 { - filter_headers.push(create_mock_filter_header()); - } - - storage.store_headers(&headers).await.unwrap(); - storage.store_filter_headers(&filter_headers).await.unwrap(); - - // Check gap detection - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(!has_gap, "Should not detect gap when filter headers >= block headers"); - assert_eq!(block_height, 99); // 0-indexed, so 100 headers = height 99 - assert_eq!(filter_height, 119); // 0-indexed, so 120 headers = height 119 - assert_eq!(gap_size, 0); -} - -#[tokio::test] -#[ignore = "mock NetworkManager implementation incomplete"] -async fn test_cfheader_restart_cooldown() { - let mut config = ClientConfig::new(Network::Dash); - config.cfheader_gap_restart_cooldown_secs = 1; // 1 second cooldown for testing - - // FilterSyncManager instantiation omitted until restart logic is implemented - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Store headers with a gap - let mut headers = Vec::new(); - let mut filter_headers = Vec::new(); - - for i in 1..=200 { - headers.push(create_mock_header(i)); - } - - for _i in 1..=100 { - filter_headers.push(create_mock_filter_header()); - } - - storage.store_headers(&headers).await.unwrap(); - storage.store_filter_headers(&filter_headers).await.unwrap(); - - // Network manager mock omitted until restart logic exists - - /* - #[async_trait::async_trait] - impl NetworkManager for MockNetworkManager { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - async fn connect(&mut self) -> NetworkResult<()> { - Ok(()) - } - - async fn disconnect(&mut self) -> NetworkResult<()> { - Ok(()) - } - - async fn send_message(&mut self, _message: NetworkMessage) -> NetworkResult<()> { - Err(NetworkError::ConnectionFailed("Mock failure".to_string())) - } - - async fn receive_message(&mut self) -> NetworkResult> { - Ok(None) - } - - fn is_connected(&self) -> bool { - true - } - - fn peer_count(&self) -> usize { - 1 - } - - fn peer_info(&self) -> Vec { - Vec::new() - } - - async fn get_peer_best_height(&self) -> dash_spv::error::NetworkResult> { - Ok(Some(100)) - } - - async fn has_peer_with_service( - &self, - _service_flags: dashcore::network::constants::ServiceFlags, - ) -> bool { - true - } - - async fn get_last_message_peer_id(&self) -> dash_spv::types::PeerId { - dash_spv::types::PeerId(1) - } - - async fn update_peer_dsq_preference(&mut self, _wants_dsq: bool) -> NetworkResult<()> { - Ok(()) - } - } - */ - - // Network manager omitted until restart logic is implemented - - // Note: The following tests are skipped because MockNetworkManager doesn't implement - // the full PeerNetworkManager interface required by maybe_restart_cfheader_sync_for_gap - // First attempt should try to restart (and fail) - // let result1 = filter_sync.maybe_restart_cfheader_sync_for_gap(&mut network, &mut storage).await; - // assert!(result1.is_err(), "First restart attempt should fail with mock network"); - - // Second attempt immediately should be blocked by cooldown - // let result2 = filter_sync.maybe_restart_cfheader_sync_for_gap(&mut network, &mut storage).await; - // assert!(result2.is_ok(), "Second attempt should not error"); - // assert!(!result2.unwrap(), "Second attempt should return false due to cooldown"); - - // Wait for cooldown to expire - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - // Third attempt should try again (and fail) - // let result3 = filter_sync.maybe_restart_cfheader_sync_for_gap(&mut network, &mut storage).await; - // The third attempt should either fail (if trying to restart) or return Ok(false) if max attempts reached - // let should_fail_or_be_disabled = result3.is_err() || (result3.is_ok() && !result3.unwrap()); - // assert!( - // should_fail_or_be_disabled, - // "Third restart attempt should fail or be disabled after cooldown" - // ); -} diff --git a/dash-spv/tests/edge_case_filter_sync_test.rs b/dash-spv/tests/edge_case_filter_sync_test.rs index 3fd51156a..23b71517b 100644 --- a/dash-spv/tests/edge_case_filter_sync_test.rs +++ b/dash-spv/tests/edge_case_filter_sync_test.rs @@ -156,68 +156,6 @@ async fn test_filter_sync_at_tip_edge_case() { assert_eq!(sent_messages.len(), 0, "Should not send any messages when at tip"); } -#[ignore = "mock implementation incomplete"] -#[tokio::test] -async fn test_filter_sync_gap_detection_edge_case() { - let config = ClientConfig::new(Network::Dash); - let received_heights = Arc::new(Mutex::new(HashSet::new())); - let filter_sync: FilterSyncManager = - FilterSyncManager::new(&config, received_heights); - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Test case 1: No gap (same height) - let height = 1000; - let mut headers = Vec::new(); - let mut filter_headers = Vec::new(); - let mut prev_hash = BlockHash::all_zeros(); - - for i in 1..=height { - let header = create_mock_header(i, prev_hash); - prev_hash = header.block_hash(); - headers.push(header); - filter_headers.push(create_mock_filter_header(i)); - } - - storage.store_headers(&headers).await.unwrap(); - storage.store_filter_headers(&filter_headers).await.unwrap(); - - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(!has_gap, "Should not detect gap when heights are equal"); - assert_eq!(block_height, height - 1); // 0-indexed - assert_eq!(filter_height, height - 1); - assert_eq!(gap_size, 0); - - // Test case 2: Gap of 1 (considered no gap) - // Add one more header to create a gap of 1 - let next_header = create_mock_header(height + 1, prev_hash); - storage.store_headers(&[next_header]).await.unwrap(); - - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(!has_gap, "Should not detect gap when difference is only 1 block"); - assert_eq!(block_height, height); // 0-indexed, so 1001 blocks = height 1000 - assert_eq!(filter_height, height - 1); - assert_eq!(gap_size, 1); - - // Test case 3: Gap of 2 (should be detected) - // Add one more header to create a gap of 2 - prev_hash = next_header.block_hash(); - let next_header2 = create_mock_header(height + 2, prev_hash); - storage.store_headers(&[next_header2]).await.unwrap(); - - let (has_gap, block_height, filter_height, gap_size) = - filter_sync.check_cfheader_gap(&storage).await.unwrap(); - - assert!(has_gap, "Should detect gap when difference is 2 or more blocks"); - assert_eq!(block_height, height + 1); // 0-indexed - assert_eq!(filter_height, height - 1); - assert_eq!(gap_size, 2); -} - #[ignore = "mock implementation incomplete"] #[tokio::test] async fn test_no_invalid_getcfheaders_at_tip() { diff --git a/dash-spv/tests/simple_gap_test.rs b/dash-spv/tests/simple_gap_test.rs deleted file mode 100644 index b04bd9161..000000000 --- a/dash-spv/tests/simple_gap_test.rs +++ /dev/null @@ -1,51 +0,0 @@ -//! Basic test for CFHeader gap detection functionality. - -use std::collections::HashSet; -use std::sync::Arc; -use tokio::sync::Mutex; - -use dash_spv::{ - client::ClientConfig, - storage::{MemoryStorageManager, StorageManager}, - sync::filters::FilterSyncManager, -}; -use dashcore::{block::Header as BlockHeader, BlockHash, Network}; -use dashcore_hashes::Hash; - -/// Create a mock block header -fn create_mock_header(height: u32) -> BlockHeader { - BlockHeader { - version: dashcore::block::Version::ONE, - prev_blockhash: BlockHash::all_zeros(), - merkle_root: dashcore::hash_types::TxMerkleNode::all_zeros(), - time: 1234567890 + height, - bits: dashcore::pow::CompactTarget::from_consensus(0x1d00ffff), - nonce: height, - } -} - -#[tokio::test] -async fn test_basic_gap_detection() { - let config = ClientConfig::new(Network::Dash); - let received_heights = Arc::new(Mutex::new(HashSet::new())); - use dash_spv::network::PeerNetworkManager; - let filter_sync: FilterSyncManager = - FilterSyncManager::new(&config, received_heights); - - let mut storage = MemoryStorageManager::new().await.unwrap(); - - // Store just a few headers to test basic functionality - let headers = vec![create_mock_header(1), create_mock_header(2), create_mock_header(3)]; - - storage.store_headers(&headers).await.unwrap(); - - // Check gap detection - should detect gap since no filter headers stored - let result = filter_sync.check_cfheader_gap(&storage).await; - assert!(result.is_ok(), "Gap detection should not error"); - - let (has_gap, block_height, filter_height, gap_size) = result.unwrap(); - assert!(has_gap, "Should detect gap when no filter headers exist"); - assert!(block_height > 0, "Block height should be > 0"); - assert_eq!(filter_height, 0, "Filter height should be 0"); - assert_eq!(gap_size, block_height, "Gap size should equal block height when no filter headers"); -}