Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new(
let storage = DiskStorageManager::new(storage_path.clone()).await;
let wallet = key_wallet_manager::wallet_manager::WalletManager::<
key_wallet::wallet::managed_wallet_info::ManagedWalletInfo,
>::new();
>::new(client_config.network);
let wallet = std::sync::Arc::new(tokio::sync::RwLock::new(wallet));

match (network, storage) {
Expand Down
8 changes: 3 additions & 5 deletions dash-spv-ffi/tests/test_wallet_manager.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#[cfg(test)]
mod tests {
use dash_spv_ffi::*;
use dashcore::Network;
use key_wallet::wallet::initialization::WalletAccountCreationOptions;
use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_ffi::{
wallet_manager::{
wallet_manager_free_wallet_ids, wallet_manager_get_wallet_ids,
wallet_manager_import_wallet_from_bytes, wallet_manager_wallet_count,
},
FFIError, FFINetwork, FFIWalletManager,
FFIError, FFIWalletManager,
};
use key_wallet_manager::wallet_manager::WalletManager;
use std::ffi::CStr;
Expand Down Expand Up @@ -58,12 +57,12 @@ mod tests {
let wallet_manager_ptr = wallet_manager as *mut key_wallet_ffi::FFIWalletManager;

// Prepare a serialized wallet using the native manager so we can import it
let mut native_manager = WalletManager::<ManagedWalletInfo>::new();
let mut native_manager =
WalletManager::<ManagedWalletInfo>::new((*config).get_inner().network);
let (serialized_wallet, expected_wallet_id) = native_manager
.create_wallet_from_mnemonic_return_serialized_bytes(
"abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
"",
Network::Dash,
0,
WalletAccountCreationOptions::Default,
false,
Expand Down Expand Up @@ -105,7 +104,6 @@ mod tests {
let mut description_error = FFIError::success();
let description_ptr = key_wallet_ffi::wallet_manager_describe(
wallet_manager_ptr as *const FFIWalletManager,
FFINetwork::Dash,
&mut description_error as *mut FFIError,
);
assert!(!description_ptr.is_null(), "describe should succeed: {:?}", description_error);
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/examples/filter_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
DiskStorageManager::new("./.tmp/filter-sync-example-storage".into()).await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

// Create the client
let mut client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?;
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/examples/simple_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
DiskStorageManager::new("./.tmp/simple-sync-example-storage".into()).await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

// Create the client
let mut client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?;
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/examples/spv_with_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage".into()).await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

// Create the SPV client with all components
let mut client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?;
Expand Down
14 changes: 5 additions & 9 deletions dash-spv/src/client/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub struct BlockProcessor<W: WalletInterface, S: StorageManager> {
event_tx: mpsc::UnboundedSender<SpvEvent>,
processed_blocks: HashSet<dashcore::BlockHash>,
failed: bool,
network: dashcore::Network,
}

impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync + 'static>
Expand All @@ -49,7 +48,6 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
storage: Arc<Mutex<S>>,
stats: Arc<RwLock<SpvStats>>,
event_tx: mpsc::UnboundedSender<SpvEvent>,
network: dashcore::Network,
) -> Self {
Self {
receiver,
Expand All @@ -59,7 +57,6 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
event_tx,
processed_blocks: HashSet::new(),
failed: false,
network,
}
}

Expand Down Expand Up @@ -177,8 +174,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
} => {
// Check compact filter with wallet
let mut wallet = self.wallet.write().await;
let matches =
wallet.check_compact_filter(&filter, &block_hash, self.network).await;
let matches = wallet.check_compact_filter(&filter, &block_hash).await;

if matches {
tracing::info!("🎯 Compact filter matched for block {}", block_hash);
Expand All @@ -191,7 +187,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
tracing::debug!(
"Compact filter did not match for block {}, {}",
block_hash,
wallet.describe(self.network).await
wallet.describe().await
);
drop(wallet);
}
Expand Down Expand Up @@ -226,7 +222,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync

// Process block with wallet
let mut wallet = self.wallet.write().await;
let txids = wallet.process_block(&block, height, self.network).await;
let txids = wallet.process_block(&block, height).await;
if !txids.is_empty() {
tracing::info!(
"🎯 Wallet found {} relevant transactions in block {} at height {}",
Expand All @@ -245,7 +241,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
for txid in &txids {
if let Some(tx) = block.txdata.iter().find(|t| &t.txid() == txid) {
// Ask the wallet for the precise effect of this transaction
let effect = wallet.transaction_effect(tx, self.network).await;
let effect = wallet.transaction_effect(tx).await;
if let Some((net_amount, affected_addresses)) = effect {
tracing::info!("📤 Emitting TransactionDetected event for {}", txid);
let _ = self.event_tx.send(SpvEvent::TransactionDetected {
Expand Down Expand Up @@ -291,7 +287,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync

// Let the wallet process the mempool transaction
let mut wallet = self.wallet.write().await;
wallet.process_mempool_transaction(&tx, self.network).await;
wallet.process_mempool_transaction(&tx).await;
drop(wallet);

// TODO: Check if transaction affects watched addresses/scripts
Expand Down
55 changes: 10 additions & 45 deletions dash-spv/src/client/block_processor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,33 @@ mod tests {

#[async_trait::async_trait]
impl key_wallet_manager::wallet_interface::WalletInterface for MockWallet {
async fn process_block(
&mut self,
block: &Block,
height: u32,
_network: Network,
) -> Vec<dashcore::Txid> {
async fn process_block(&mut self, block: &Block, height: u32) -> Vec<dashcore::Txid> {
let mut processed = self.processed_blocks.lock().await;
processed.push((block.block_hash(), height));

// Return txids of all transactions in block as "relevant"
block.txdata.iter().map(|tx| tx.txid()).collect()
}

async fn process_mempool_transaction(&mut self, tx: &Transaction, _network: Network) {
async fn process_mempool_transaction(&mut self, tx: &Transaction) {
let mut processed = self.processed_transactions.lock().await;
processed.push(tx.txid());
}

async fn handle_reorg(&mut self, _from_height: u32, _to_height: u32, _network: Network) {
// Not tested here
}

async fn check_compact_filter(
&mut self,
_filter: &dashcore::bip158::BlockFilter,
_block_hash: &dashcore::BlockHash,
_network: Network,
) -> bool {
// Return true for all filters in test
true
}

async fn describe(&self, _network: Network) -> String {
async fn describe(&self) -> String {
"MockWallet (test implementation)".to_string()
}

async fn transaction_effect(
&self,
tx: &Transaction,
_network: Network,
) -> Option<(i64, Vec<String>)> {
async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec<String>)> {
let map = self.effects.lock().await;
map.get(&tx.txid()).cloned()
}
Expand All @@ -104,14 +90,8 @@ mod tests {
let storage = Arc::new(Mutex::new(
DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"),
));
let processor = BlockProcessor::new(
task_rx,
wallet.clone(),
storage.clone(),
stats,
event_tx,
Network::Dash,
);
let processor =
BlockProcessor::new(task_rx, wallet.clone(), storage.clone(), stats, event_tx);

(processor, task_tx, event_rx, wallet, storage)
}
Expand Down Expand Up @@ -268,36 +248,22 @@ mod tests {

#[async_trait::async_trait]
impl key_wallet_manager::wallet_interface::WalletInterface for NonMatchingWallet {
async fn process_block(
&mut self,
_block: &Block,
_height: u32,
_network: Network,
) -> Vec<dashcore::Txid> {
async fn process_block(&mut self, _block: &Block, _height: u32) -> Vec<dashcore::Txid> {
Vec::new()
}

async fn process_mempool_transaction(&mut self, _tx: &Transaction, _network: Network) {}

async fn handle_reorg(
&mut self,
_from_height: u32,
_to_height: u32,
_network: Network,
) {
}
async fn process_mempool_transaction(&mut self, _tx: &Transaction) {}

async fn check_compact_filter(
&mut self,
_filter: &dashcore::bip158::BlockFilter,
_block_hash: &dashcore::BlockHash,
_network: Network,
) -> bool {
// Always return false - filter doesn't match
false
}

async fn describe(&self, _network: Network) -> String {
async fn describe(&self) -> String {
"NonMatchingWallet (test implementation)".to_string()
}
}
Expand All @@ -310,8 +276,7 @@ mod tests {
DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"),
));

let processor =
BlockProcessor::new(task_rx, wallet, storage, stats, event_tx, Network::Dash);
let processor = BlockProcessor::new(task_rx, wallet, storage, stats, event_tx);

let block_hash = create_test_block(Network::Dash).block_hash();
let filter_data = vec![1, 2, 3, 4, 5];
Expand Down
1 change: 0 additions & 1 deletion dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ impl<
self.storage.clone(),
self.stats.clone(),
self.event_tx.clone(),
self.config.network,
);

tokio::spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ mod tests {
let network_manager = MockNetworkManager::new();
let storage =
DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage");
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

let client = DashSpvClient::new(config, network_manager, storage, wallet)
.await
Expand Down Expand Up @@ -125,7 +125,7 @@ mod tests {
let network_manager = MockNetworkManager::new();
let storage =
DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage");
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

let mut client = DashSpvClient::new(config, network_manager, storage, wallet)
.await
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//! // Create the required components
//! let network = PeerNetworkManager::new(&config).await?;
//! let storage = DiskStorageManager::new("./.tmp/example-storage".into()).await?;
//! let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new()));
//! let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
//!
//! // Create and start the client
//! let mut client = DashSpvClient::new(config.clone(), network, storage, wallet).await?;
Expand Down
36 changes: 5 additions & 31 deletions dash-spv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,10 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
tracing::info!("Sync strategy: Sequential");

// Create the wallet manager
let mut wallet_manager = WalletManager::<ManagedWalletInfo>::new();
let mut wallet_manager = WalletManager::<ManagedWalletInfo>::new(config.network);
let wallet_id = wallet_manager.create_wallet_from_mnemonic(
mnemonic_phrase.as_str(),
"",
network,
0,
key_wallet::wallet::initialization::WalletAccountCreationOptions::default(),
)?;
Expand Down Expand Up @@ -415,7 +414,6 @@ async fn run_client<S: dash_spv::storage::StorageManager + Send + Sync + 'static
// Take the client's event receiver and spawn a logger task
if let Some(mut event_rx) = client.take_event_receiver() {
let wallet_for_logger = wallet.clone();
let network_for_logger = config.network;
let wallet_id_for_logger = wallet_id;
tokio::spawn(async move {
use dash_spv::types::SpvEvent;
Expand Down Expand Up @@ -453,16 +451,11 @@ async fn run_client<S: dash_spv::storage::StorageManager + Send + Sync + 'static
_ = tokio::time::sleep(snapshot_interval) => {
// Log snapshot if interval has elapsed
if last_snapshot.elapsed() >= snapshot_interval {
let (tx_count, wallet_affecting_tx_count, confirmed, unconfirmed, locked, total, derived_incoming) = {
let (tx_count, confirmed, unconfirmed, locked, total) = {
let mgr = wallet_for_logger.read().await;
// Count transactions via network state for the selected network
let txs = mgr
.get_network_state(network_for_logger)
.map(|ns| ns.transactions.len())
.unwrap_or(0);

// Count wallet-affecting transactions from wallet transaction history
let wallet_affecting = mgr
let tx_count = mgr
.wallet_transaction_history(&wallet_id_for_logger)
.map(|v| v.len())
.unwrap_or(0);
Expand All @@ -471,36 +464,17 @@ async fn run_client<S: dash_spv::storage::StorageManager + Send + Sync + 'static
let wb = mgr.get_wallet_balance(&wallet_id_for_logger).ok();
let (c, u, l, t) = wb.map(|b| (b.confirmed, b.unconfirmed, b.locked, b.total)).unwrap_or((0, 0, 0, 0));

// Derive a conservative incoming total by summing tx outputs to our addresses.
let incoming_sum = if let Some(ns) = mgr.get_network_state(network_for_logger) {
let addrs = mgr.monitored_addresses();
let addr_set: std::collections::HashSet<_> = addrs.into_iter().collect();
let mut sum_incoming: u64 = 0;
for rec in ns.transactions.values() {
for out in &rec.transaction.output {
if let Ok(out_addr) = dashcore::Address::from_script(&out.script_pubkey, network_for_logger) {
if addr_set.contains(&out_addr) {
sum_incoming = sum_incoming.saturating_add(out.value);
}
}
}
}
sum_incoming
} else { 0 };

(txs, wallet_affecting, c, u, l, t, incoming_sum)
(tx_count, c, u, l, t)
};
tracing::info!(
"Wallet tx summary: detected={} (blocks={} + mempool={}), affecting_wallet={}, balances: confirmed={} unconfirmed={} locked={} total={}, derived_incoming_total={} (approx)",
"Wallet tx summary: tx_count={} (blocks={} + mempool={}), balances: confirmed={} unconfirmed={} locked={} total={}",
tx_count,
total_detected_block_txs,
total_detected_mempool_txs,
wallet_affecting_tx_count,
confirmed,
unconfirmed,
locked,
total,
derived_incoming
);
last_snapshot = std::time::Instant::now();
}
Expand Down
3 changes: 1 addition & 2 deletions dash-spv/src/sync/filters/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
filter_data: &[u8],
block_hash: &BlockHash,
wallet: &mut W,
network: dashcore::Network,
) -> SyncResult<bool> {
// Create the BlockFilter from the raw data
let filter = dashcore::bip158::BlockFilter::new(filter_data);

// Use wallet's check_compact_filter method
let matches = wallet.check_compact_filter(&filter, block_hash, network).await;
let matches = wallet.check_compact_filter(&filter, block_hash).await;
if matches {
tracing::info!("🎯 Filter match found for block {}", block_hash);
Ok(true)
Expand Down
Loading