Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dash-spv/benches/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use dash_spv::{
storage::{DiskStorageManager, StorageManager},
storage::{BlockHeaderStorage, DiskStorageManager, StorageManager},
Hash,
};
use dashcore::{block::Version, BlockHash, CompactTarget, Header};
Expand Down
3 changes: 1 addition & 2 deletions dash-spv/examples/filter_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let network_manager = PeerNetworkManager::new(&config).await?;

// Create storage manager
let storage_manager =
DiskStorageManager::new("./.tmp/filter-sync-example-storage".into()).await?;
let storage_manager = DiskStorageManager::new("./.tmp/filter-sync-example-storage").await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
Expand Down
3 changes: 1 addition & 2 deletions dash-spv/examples/simple_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let network_manager = PeerNetworkManager::new(&config).await?;

// Create storage manager
let storage_manager =
DiskStorageManager::new("./.tmp/simple-sync-example-storage".into()).await?;
let storage_manager = DiskStorageManager::new("./.tmp/simple-sync-example-storage").await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
Expand Down
3 changes: 1 addition & 2 deletions dash-spv/examples/spv_with_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let network_manager = PeerNetworkManager::new(&config).await?;

// Create storage manager - use disk storage for persistence
let storage_manager =
DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage".into()).await?;
let storage_manager = DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage").await?;

// Create wallet manager
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/client/block_processor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod tests {
use crate::client::block_processor::{BlockProcessingTask, BlockProcessor};

use crate::storage::DiskStorageManager;
use crate::storage::{BlockHeaderStorage, DiskStorageManager};
use crate::types::{SpvEvent, SpvStats};
use dashcore::{blockdata::constants::genesis_block, Block, Network, Transaction};

Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
// Shutdown storage to ensure all data is persisted
{
let mut storage = self.storage.lock().await;
storage.shutdown().await.map_err(SpvError::Storage)?;
storage.shutdown().await;
tracing::info!("Storage shutdown completed - all data persisted");
}

Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//!
//! // Create the required components
//! let network = PeerNetworkManager::new(&config).await?;
//! let storage = DiskStorageManager::new("./.tmp/example-storage".into()).await?;
//! let storage = DiskStorageManager::new("./.tmp/example-storage").await?;
//! let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
//!
//! // Create and start the client
Expand Down
180 changes: 180 additions & 0 deletions dash-spv/src/storage/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//! Header storage operations for DiskStorageManager.

use std::collections::HashMap;
use std::ops::Range;
use std::path::PathBuf;

use async_trait::async_trait;
use dashcore::block::Header as BlockHeader;
use dashcore::BlockHash;
use tokio::sync::RwLock;

use crate::error::StorageResult;
use crate::storage::io::atomic_write;
use crate::storage::segments::SegmentCache;
use crate::storage::PersistentStorage;
use crate::StorageError;

#[async_trait]
pub trait BlockHeaderStorage {
async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()>;

async fn store_headers_at_height(
&mut self,
headers: &[BlockHeader],
height: u32,
) -> StorageResult<()>;

async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>>;

async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
if let Some(tip_height) = self.get_tip_height().await {
if height > tip_height {
return Ok(None);
}
} else {
return Ok(None);
}

if let Some(start_height) = self.get_start_height().await {
if height < start_height {
return Ok(None);
}
} else {
return Ok(None);
}

Ok(self.load_headers(height..height + 1).await?.first().copied())
}

async fn get_tip_height(&self) -> Option<u32>;

async fn get_start_height(&self) -> Option<u32>;

async fn get_stored_headers_len(&self) -> u32;

async fn get_header_height_by_hash(
&self,
hash: &dashcore::BlockHash,
) -> StorageResult<Option<u32>>;
}

pub struct PersistentBlockHeaderStorage {
block_headers: RwLock<SegmentCache<BlockHeader>>,
header_hash_index: HashMap<BlockHash, u32>,
}

impl PersistentBlockHeaderStorage {
const FOLDER_NAME: &str = "block_headers";
const INDEX_FILE_NAME: &str = "index.dat";
}

#[async_trait]
impl PersistentStorage for PersistentBlockHeaderStorage {
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
let storage_path = storage_path.into();
let segments_folder = storage_path.join(Self::FOLDER_NAME);

let index_path = segments_folder.join(Self::INDEX_FILE_NAME);

let mut block_headers = SegmentCache::load_or_new(&segments_folder).await?;

let header_hash_index = match tokio::fs::read(&index_path)
.await
.ok()
.and_then(|content| bincode::deserialize(&content).ok())
{
Some(index) => index,
_ => {
if segments_folder.exists() {
block_headers.build_block_index_from_segments().await?
} else {
HashMap::new()
}
}
};

Ok(Self {
block_headers: RwLock::new(block_headers),
header_hash_index,
})
}

async fn persist(&mut self, storage_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
let block_headers_folder = storage_path.into().join(Self::FOLDER_NAME);
let index_path = block_headers_folder.join(Self::INDEX_FILE_NAME);

tokio::fs::create_dir_all(&block_headers_folder).await?;

self.block_headers.write().await.persist(&block_headers_folder).await;

let data = bincode::serialize(&self.header_hash_index)
.map_err(|e| StorageError::WriteFailed(format!("Failed to serialize index: {}", e)))?;

atomic_write(&index_path, &data).await
}
}

#[async_trait]
impl BlockHeaderStorage for PersistentBlockHeaderStorage {
async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> {
let height = self.block_headers.read().await.next_height();
self.store_headers_at_height(headers, height).await
}

async fn store_headers_at_height(
&mut self,
headers: &[BlockHeader],
height: u32,
) -> StorageResult<()> {
let mut height = height;

let hashes = headers.iter().map(|header| header.block_hash()).collect::<Vec<_>>();

self.block_headers.write().await.store_items_at_height(headers, height).await?;

for hash in hashes {
self.header_hash_index.insert(hash, height);
height += 1;
}

Ok(())
}

async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
self.block_headers.write().await.get_items(range).await
}

async fn get_tip_height(&self) -> Option<u32> {
self.block_headers.read().await.tip_height()
}

async fn get_start_height(&self) -> Option<u32> {
self.block_headers.read().await.start_height()
}

async fn get_stored_headers_len(&self) -> u32 {
let block_headers = self.block_headers.read().await;

let start_height = if let Some(start_height) = block_headers.start_height() {
start_height
} else {
return 0;
};

let end_height = if let Some(end_height) = block_headers.tip_height() {
end_height
} else {
return 0;
};

end_height - start_height + 1
}

async fn get_header_height_by_hash(
&self,
hash: &dashcore::BlockHash,
) -> StorageResult<Option<u32>> {
Ok(self.header_hash_index.get(hash).copied())
}
}
101 changes: 101 additions & 0 deletions dash-spv/src/storage/chainstate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::path::PathBuf;

use async_trait::async_trait;

use crate::{
error::StorageResult,
storage::{io::atomic_write, PersistentStorage},
ChainState,
};

#[async_trait]
pub trait ChainStateStorage {
async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()>;

async fn load_chain_state(&self) -> StorageResult<Option<ChainState>>;
}

pub struct PersistentChainStateStorage {
storage_path: PathBuf,
}

impl PersistentChainStateStorage {
const FOLDER_NAME: &str = "chainstate";
const FILE_NAME: &str = "chainstate.json";
}

#[async_trait]
impl PersistentStorage for PersistentChainStateStorage {
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
Ok(PersistentChainStateStorage {
storage_path: storage_path.into(),
})
}

async fn persist(&mut self, _storage_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
// Current implementation persists data everytime data is stored
Ok(())
}
}

#[async_trait]
impl ChainStateStorage for PersistentChainStateStorage {
async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> {
let state_data = serde_json::json!({
"last_chainlock_height": state.last_chainlock_height,
"last_chainlock_hash": state.last_chainlock_hash,
"current_filter_tip": state.current_filter_tip,
"last_masternode_diff_height": state.last_masternode_diff_height,
"sync_base_height": state.sync_base_height,
});

let chainstate_folder = self.storage_path.join(Self::FOLDER_NAME);
let path = chainstate_folder.join(Self::FILE_NAME);

tokio::fs::create_dir_all(chainstate_folder).await?;

let json = state_data.to_string();
atomic_write(&path, json.as_bytes()).await?;

Ok(())
}

async fn load_chain_state(&self) -> StorageResult<Option<ChainState>> {
let path = self.storage_path.join(Self::FOLDER_NAME).join(Self::FILE_NAME);
if !path.exists() {
return Ok(None);
}

let content = tokio::fs::read_to_string(path).await?;
let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| {
crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e))
})?;

let state = ChainState {
last_chainlock_height: value
.get("last_chainlock_height")
.and_then(|v| v.as_u64())
.map(|h| h as u32),
last_chainlock_hash: value
.get("last_chainlock_hash")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok()),
current_filter_tip: value
.get("current_filter_tip")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok()),
masternode_engine: None,
last_masternode_diff_height: value
.get("last_masternode_diff_height")
.and_then(|v| v.as_u64())
.map(|h| h as u32),
sync_base_height: value
.get("sync_base_height")
.and_then(|v| v.as_u64())
.map(|h| h as u32)
.unwrap_or(0),
};

Ok(Some(state))
}
}
Loading
Loading