Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions dash-spv/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,42 +78,9 @@ pub struct ClientConfig {
/// Maximum concurrent filter requests (default: 8).
pub max_concurrent_filter_requests: usize,

/// Enable flow control for filter requests (default: true).
pub enable_filter_flow_control: bool,

/// Delay between filter requests in milliseconds (default: 50).
pub filter_request_delay_ms: u64,

/// Enable automatic CFHeader gap detection and restart
pub enable_cfheader_gap_restart: bool,

/// Interval for checking CFHeader gaps (seconds)
pub cfheader_gap_check_interval_secs: u64,

/// Cooldown between CFHeader restart attempts (seconds)
pub cfheader_gap_restart_cooldown_secs: u64,

/// Maximum CFHeader gap restart attempts
pub max_cfheader_gap_restart_attempts: u32,

/// Enable automatic filter gap detection and restart
pub enable_filter_gap_restart: bool,

/// Interval for checking filter gaps (seconds)
pub filter_gap_check_interval_secs: u64,

/// Minimum filter gap size to trigger restart (blocks)
pub min_filter_gap_size: u32,

/// Cooldown between filter restart attempts (seconds)
pub filter_gap_restart_cooldown_secs: u64,

/// Maximum filter gap restart attempts
pub max_filter_gap_restart_attempts: u32,

/// Maximum number of filters to sync in a single gap sync batch
pub max_filter_gap_sync_size: u32,

// Mempool configuration
/// Enable tracking of unconfirmed (mempool) transactions.
pub enable_mempool_tracking: bool,
Expand Down Expand Up @@ -159,9 +126,6 @@ pub struct ClientConfig {
/// Maximum concurrent CFHeaders requests for parallel sync (default: 50).
pub max_concurrent_cfheaders_requests_parallel: usize,

/// Enable flow control for CFHeaders requests (default: true).
pub enable_cfheaders_flow_control: bool,

/// Timeout for CFHeaders requests in seconds (default: 30).
pub cfheaders_request_timeout_secs: u64,

Expand Down Expand Up @@ -210,18 +174,7 @@ impl Default for ClientConfig {
log_level: "info".to_string(),
user_agent: None,
max_concurrent_filter_requests: 16,
enable_filter_flow_control: true,
filter_request_delay_ms: 0,
enable_cfheader_gap_restart: true,
cfheader_gap_check_interval_secs: 15,
cfheader_gap_restart_cooldown_secs: 30,
max_cfheader_gap_restart_attempts: 5,
enable_filter_gap_restart: true,
filter_gap_check_interval_secs: 20,
min_filter_gap_size: 10,
filter_gap_restart_cooldown_secs: 30,
max_filter_gap_restart_attempts: 5,
max_filter_gap_sync_size: 50000,
// Mempool defaults
enable_mempool_tracking: true,
mempool_strategy: MempoolStrategy::FetchAll,
Expand All @@ -243,7 +196,6 @@ impl Default for ClientConfig {
wallet_creation_time: None,
// CFHeaders flow control defaults
max_concurrent_cfheaders_requests_parallel: 50,
enable_cfheaders_flow_control: true,
cfheaders_request_timeout_secs: 30,
max_cfheaders_retries: 3,
// QRInfo defaults (simplified per plan)
Expand Down Expand Up @@ -341,12 +293,6 @@ impl ClientConfig {
self
}

/// Enable or disable filter flow control.
pub fn with_filter_flow_control(mut self, enabled: bool) -> Self {
self.enable_filter_flow_control = enabled;
self
}

/// Set delay between filter requests.
pub fn with_filter_request_delay(mut self, delay_ms: u64) -> Self {
self.filter_request_delay_ms = delay_ms;
Expand Down
25 changes: 0 additions & 25 deletions dash-spv/src/client/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mod tests {
assert!(config.enable_persistence);
assert_eq!(config.log_level, "info");
assert_eq!(config.max_concurrent_filter_requests, 16);
assert!(config.enable_filter_flow_control);
assert_eq!(config.filter_request_delay_ms, 0);

// Mempool defaults
Expand Down Expand Up @@ -66,7 +65,6 @@ mod tests {
.with_connection_timeout(Duration::from_secs(10))
.with_log_level("debug")
.with_max_concurrent_filter_requests(32)
.with_filter_flow_control(false)
.with_filter_request_delay(100)
.with_mempool_tracking(MempoolStrategy::BloomFilter)
.with_max_mempool_transactions(500)
Expand All @@ -80,7 +78,6 @@ mod tests {
assert_eq!(config.connection_timeout, Duration::from_secs(10));
assert_eq!(config.log_level, "debug");
assert_eq!(config.max_concurrent_filter_requests, 32);
assert!(!config.enable_filter_flow_control);
assert_eq!(config.filter_request_delay_ms, 100);

// Mempool settings
Expand Down Expand Up @@ -196,28 +193,6 @@ mod tests {

// Removed selective strategy validation test; Selective variant no longer exists

#[test]
fn test_cfheader_gap_settings() {
let config = ClientConfig::default();

assert!(config.enable_cfheader_gap_restart);
assert_eq!(config.cfheader_gap_check_interval_secs, 15);
assert_eq!(config.cfheader_gap_restart_cooldown_secs, 30);
assert_eq!(config.max_cfheader_gap_restart_attempts, 5);
}

#[test]
fn test_filter_gap_settings() {
let config = ClientConfig::default();

assert!(config.enable_filter_gap_restart);
assert_eq!(config.filter_gap_check_interval_secs, 20);
assert_eq!(config.min_filter_gap_size, 10);
assert_eq!(config.filter_gap_restart_cooldown_secs, 30);
assert_eq!(config.max_filter_gap_restart_attempts, 5);
assert_eq!(config.max_filter_gap_sync_size, 50000);
}

#[test]
fn test_request_control_defaults() {
let config = ClientConfig::default();
Expand Down
171 changes: 0 additions & 171 deletions dash-spv/src/client/filter_sync.rs
Original file line number Diff line number Diff line change
@@ -1,171 +0,0 @@
//! Filter synchronization and management for the Dash SPV client.

use crate::error::{Result, SpvError};
use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::SyncManager;
use crate::types::FilterMatch;
use crate::types::SpvStats;
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Filter synchronization manager for coordinating filter downloads and checking.
pub struct FilterSyncCoordinator<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> {
sync_manager: &'a mut SyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
stats: &'a Arc<RwLock<SpvStats>>,
running: &'a Arc<RwLock<bool>>,
}

impl<
'a,
S: StorageManager + Send + Sync + 'static,
N: NetworkManager + Send + Sync + 'static,
W: WalletInterface,
> FilterSyncCoordinator<'a, S, N, W>
{
/// Create a new filter sync coordinator.
pub fn new(
sync_manager: &'a mut SyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
stats: &'a Arc<RwLock<SpvStats>>,
running: &'a Arc<RwLock<bool>>,
) -> Self {
Self {
sync_manager,
storage,
network,
stats,
running,
}
}

/// Sync compact filters for recent blocks and check for matches.
/// Sync and check filters with internal monitoring loop management.
/// This method automatically handles the monitoring loop required for CFilter message processing.
pub async fn sync_and_check_filters_with_monitoring(
&mut self,
num_blocks: Option<u32>,
) -> Result<Vec<FilterMatch>> {
// Just delegate to the regular method for now - the real fix is in sync_filters_coordinated
self.sync_and_check_filters(num_blocks).await
}

pub async fn sync_and_check_filters(
&mut self,
num_blocks: Option<u32>,
) -> Result<Vec<FilterMatch>> {
let running = self.running.read().await;
if !*running {
return Err(SpvError::Config("Client not running".to_string()));
}
drop(running);

// Get current filter tip height to determine range (use filter headers, not block headers)
// This ensures consistency between range calculation and progress tracking
let tip_height =
self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0);

// Determine how many blocks to request
let num_blocks = num_blocks.unwrap_or(100).max(1);
let default_start = tip_height.saturating_sub(num_blocks - 1);

// Ask the wallet for an earliest rescan height, falling back to the default window.
let wallet_hint = self.sync_manager.wallet_birth_height_hint().await;
let mut start_height = wallet_hint.unwrap_or(default_start).min(default_start);

// Respect any user-provided start height hint from the configuration.
if let Some(config_start) = self.sync_manager.config_start_height() {
let capped = config_start.min(tip_height);
start_height = start_height.max(capped);
}

// Make sure we never request past the current tip
start_height = start_height.min(tip_height);

let actual_count = if start_height <= tip_height {
tip_height - start_height + 1
} else {
0
};

tracing::info!(
"Requesting filters from height {} to {} ({} blocks based on filter tip height)",
start_height,
tip_height,
actual_count
);
if let Some(hint) = wallet_hint {
tracing::debug!("Wallet hint for earliest required height: {}", hint);
}
tracing::info!("Filter processing and matching will happen automatically in background thread as CFilter messages arrive");

// Send filter requests - processing will happen automatically in the background
if actual_count > 0 {
self.sync_filters_coordinated(start_height, actual_count).await?;
} else {
tracing::debug!("No filters requested because calculated range is empty");
}

// Return empty vector since matching happens asynchronously in the filter processor thread
// Actual matches will be processed and blocks requested automatically when CFilter messages arrive
Ok(Vec::new())
}

/// Sync filters for a specific height range.
pub async fn sync_filters_range(
&mut self,
start_height: Option<u32>,
count: Option<u32>,
) -> Result<()> {
// Get filter tip height to determine default values
let filter_tip_height =
self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0);

let start = start_height.unwrap_or(filter_tip_height.saturating_sub(99));
let num_blocks = count.unwrap_or(100);

tracing::info!(
"Starting filter sync for specific range from height {} ({} blocks)",
start,
num_blocks
);

self.sync_filters_coordinated(start, num_blocks).await
}

/// Sync filters in coordination with the monitoring loop using flow control processing
async fn sync_filters_coordinated(&mut self, start_height: u32, count: u32) -> Result<()> {
tracing::info!("Starting coordinated filter sync with flow control from height {} to {} ({} filters expected)",
start_height, start_height + count - 1, count);

// Start tracking filter sync progress
crate::sync::filters::FilterSyncManager::<S, N>::start_filter_sync_tracking(
self.stats,
count as u64,
)
.await;

// Use the new flow control method
self.sync_manager
.filter_sync_mut()
.sync_filters_with_flow_control(
&mut *self.network,
&mut *self.storage,
Some(start_height),
Some(count),
)
.await
.map_err(SpvError::Sync)?;

let (pending_count, active_count, flow_enabled) =
self.sync_manager.filter_sync().get_flow_control_status();
tracing::info!("✅ Filter sync with flow control initiated (flow control enabled: {}, {} requests queued, {} active)",
flow_enabled, pending_count, active_count);

Ok(())
}
}
16 changes: 1 addition & 15 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::mempool_filter::MempoolFilter;
use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::SyncManager;
use crate::types::{MempoolState, SpvEvent, SpvStats};
use crate::types::{MempoolState, SpvEvent};
// Removed local ad-hoc compact filter construction in favor of always processing full blocks
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
Expand All @@ -18,7 +18,6 @@ pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager, W: WalletInt
storage: &'a mut S,
network: &'a mut N,
config: &'a ClientConfig,
stats: &'a Arc<RwLock<SpvStats>>,
block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender<crate::client::BlockProcessingTask>,
mempool_filter: &'a Option<Arc<MempoolFilter>>,
mempool_state: &'a Arc<RwLock<MempoolState>>,
Expand All @@ -39,7 +38,6 @@ impl<
storage: &'a mut S,
network: &'a mut N,
config: &'a ClientConfig,
stats: &'a Arc<RwLock<SpvStats>>,
block_processor_tx: &'a tokio::sync::mpsc::UnboundedSender<
crate::client::BlockProcessingTask,
>,
Expand All @@ -52,7 +50,6 @@ impl<
storage,
network,
config,
stats,
block_processor_tx,
mempool_filter,
mempool_state,
Expand Down Expand Up @@ -338,17 +335,6 @@ impl<
}
NetworkMessage::CFilter(cfilter) => {
tracing::debug!("Received CFilter for block {}", cfilter.block_hash);

// Record the height of this received filter for gap tracking
crate::sync::filters::FilterSyncManager::<S, N>::record_filter_received_at_height(
self.stats,
&*self.storage,
&cfilter.block_hash,
)
.await;

// Sequential sync manager handles the filter internally
// For sequential sync, filter checking is done within the sync manager
}
NetworkMessage::SendDsq(wants_dsq) => {
tracing::info!("Received SendDsq message - peer wants DSQ messages: {}", wants_dsq);
Expand Down
3 changes: 0 additions & 3 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//!
//! - `block_processor.rs` (649 lines) - Block processing and validation
//! - `config.rs` (484 lines) - Client configuration
//! - `filter_sync.rs` (171 lines) - Filter synchronization
//! - `message_handler.rs` (585 lines) - Network message handling
//! - `status_display.rs` (242 lines) - Status display formatting
//!
Expand All @@ -35,7 +34,6 @@
// Existing extracted modules
pub mod block_processor;
pub mod config;
pub mod filter_sync;
pub mod interface;
pub mod message_handler;
pub mod status_display;
Expand All @@ -54,7 +52,6 @@ mod transactions;
// Re-export public types from extracted modules
pub use block_processor::{BlockProcessingTask, BlockProcessor};
pub use config::ClientConfig;
pub use filter_sync::FilterSyncCoordinator;
pub use message_handler::MessageHandler;
pub use status_display::StatusDisplay;

Expand Down
Loading