diff --git a/dash-spv/src/storage/disk/segments.rs b/dash-spv/src/storage/disk/segments.rs index 4096db52e..da717cde1 100644 --- a/dash-spv/src/storage/disk/segments.rs +++ b/dash-spv/src/storage/disk/segments.rs @@ -256,7 +256,7 @@ impl SegmentCache { let end_segment = Self::index_to_segment_id(storage_end_idx); for segment_id in start_segment..=end_segment { - let segment = self.get_segment(&segment_id).await?; + let segment = self.get_segment_mut(&segment_id).await?; let item_count = segment.items.len() as u32; let seg_start_idx = if segment_id == start_segment { @@ -271,11 +271,7 @@ impl SegmentCache { item_count }; - if seg_start_idx < item_count && seg_end_idx <= item_count { - items.extend_from_slice( - &segment.items[seg_start_idx as usize..seg_end_idx as usize], - ); - } + items.extend_from_slice(segment.get(seg_start_idx..seg_end_idx)); } Ok(items) @@ -394,11 +390,14 @@ pub struct Segment { impl Segment { const ITEMS_PER_SEGMENT: u32 = 50_000; - fn new(segment_id: u32, items: Vec) -> Self { + fn new(segment_id: u32, mut items: Vec, state: SegmentState) -> Self { + debug_assert!(items.len() <= Self::ITEMS_PER_SEGMENT as usize); + items.truncate(Self::ITEMS_PER_SEGMENT as usize); + Self { segment_id, items, - state: SegmentState::Clean, + state, last_accessed: Instant::now(), } } @@ -407,7 +406,7 @@ impl Segment { // Load segment from disk let segment_path = base_path.join(I::relative_disk_path(segment_id)); - let items = if segment_path.exists() { + let (items, state) = if segment_path.exists() { let file = File::open(&segment_path)?; let mut reader = BufReader::new(file); let mut items = Vec::with_capacity(Segment::::ITEMS_PER_SEGMENT as usize); @@ -429,12 +428,12 @@ impl Segment { } } - items + (items, SegmentState::Clean) } else { - Vec::with_capacity(Self::ITEMS_PER_SEGMENT as usize) + (Vec::with_capacity(Self::ITEMS_PER_SEGMENT as usize), SegmentState::Dirty) }; - Ok(Self::new(segment_id, items)) + Ok(Self::new(segment_id, items, state)) } pub fn persist(&mut self, base_path: &Path) -> StorageResult<()> { @@ -466,6 +465,8 @@ impl Segment { } pub fn insert(&mut self, item: I, offset: u32) { + debug_assert!(offset < Self::ITEMS_PER_SEGMENT); + let offset = offset as usize; debug_assert!(offset <= self.items.len()); @@ -476,7 +477,8 @@ impl Segment { self.items.push(item); } else { tracing::error!( - "Tried to store an item out of the allowed bounds in segment with id {}", + "Tried to store an item out of the allowed bounds (offset {}) in segment with id {}", + offset, self.segment_id ); } @@ -485,4 +487,209 @@ impl Segment { self.state = SegmentState::Dirty; self.last_accessed = std::time::Instant::now(); } + + pub fn get(&mut self, range: Range) -> &[I] { + self.last_accessed = std::time::Instant::now(); + + if range.start as usize >= self.items.len() { + return &[]; + }; + + let end = range.end.min(self.items.len() as u32); + + &self.items[range.start as usize..end as usize] + } +} + +#[cfg(test)] +mod tests { + use dashcore_hashes::Hash; + use tempfile::TempDir; + + use super::*; + + trait TestStruct { + fn new_test(id: u32) -> Self; + } + + impl TestStruct for FilterHeader { + fn new_test(id: u32) -> Self { + let mut bytes = [0u8; 32]; + bytes[0..4].copy_from_slice(&id.to_le_bytes()); + FilterHeader::from_raw_hash(dashcore_hashes::sha256d::Hash::from_byte_array(bytes)) + } + } + + #[tokio::test] + async fn test_segment_cache_eviction() { + let tmp_dir = TempDir::new().unwrap(); + + const MAX_SEGMENTS: u32 = SegmentCache::::MAX_ACTIVE_SEGMENTS as u32; + + let mut cache = SegmentCache::::new(tmp_dir.path()) + .await + .expect("Failed to create new segment_cache"); + + // This logic is a little tricky. Each cache can contain up to MAX_SEGMENTS segments in memory. + // By storing MAX_SEGMENTS + 1 items, we ensure that the cache will evict the first introduced. + // Then, by asking again in order starting in 0, we force the cache to load the evicted segment + // from disk, evicting at the same time the next, 1 in this case. Then we ask for the 1 that we + // know is evicted and so on. + + for i in 0..=MAX_SEGMENTS { + let segment = cache.get_segment_mut(&i).await.expect("Failed to create a new segment"); + assert!(segment.items.is_empty()); + assert!(segment.state == SegmentState::Dirty); + + segment.items = vec![FilterHeader::new_test(i)]; + } + + for i in 0..=MAX_SEGMENTS { + assert_eq!(cache.segments.len(), MAX_SEGMENTS as usize); + + let segment = cache.get_segment_mut(&i).await.expect("Failed to load segment"); + + assert_eq!(segment.items.len(), 1); + assert_eq!(segment.get(0..1), [FilterHeader::new_test(i)]); + assert!(segment.state == SegmentState::Clean); + } + } + + #[tokio::test] + async fn test_segment_cache_persist_load() { + let tmp_dir = TempDir::new().unwrap(); + + let items: Vec<_> = (0..10).map(FilterHeader::new_test).collect(); + + let mut cache = SegmentCache::::new(tmp_dir.path()) + .await + .expect("Failed to create new segment_cache"); + + let segment = cache.get_segment_mut(&0).await.expect("Failed to create a new segment"); + + assert_eq!(segment.state, SegmentState::Dirty); + segment.items = items.clone(); + + assert!(segment.persist(tmp_dir.path()).is_ok()); + + cache.clear_in_memory(); + assert!(cache.segments.is_empty()); + + let segment = cache.get_segment(&0).await.expect("Failed to load segment"); + + assert_eq!(segment.items, items); + assert_eq!(segment.state, SegmentState::Clean); + + cache.clear_all().await.expect("Failed to clean on-memory and on-disk data"); + assert!(cache.segments.is_empty()); + + let segment = cache.get_segment(&0).await.expect("Failed to create a new segment"); + + assert!(segment.items.is_empty()); + assert_eq!(segment.state, SegmentState::Dirty); + } + + #[tokio::test] + async fn test_segment_cache_get_insert() { + let tmp_dir = TempDir::new().unwrap(); + + const ITEMS_PER_SEGMENT: u32 = Segment::::ITEMS_PER_SEGMENT; + + let mut cache = SegmentCache::::new(tmp_dir.path()) + .await + .expect("Failed to create new segment_cache"); + + let items = cache + .get_items(0..ITEMS_PER_SEGMENT) + .await + .expect("segment cache couldn't return items"); + + assert!(items.is_empty()); + + let items = cache + .get_items(0..ITEMS_PER_SEGMENT + 1) + .await + .expect("segment cache couldn't return items"); + + assert!(items.is_empty()); + + // Cannot test the store logic bcs it depends on the DiskStorageManager, test that struct properly or + // remove the necessity of it + } + + #[tokio::test] + async fn test_segment_persist_load() { + let tmp_dir = TempDir::new().unwrap(); + + let segment_id = 10; + + const MAX_ITEMS: u32 = Segment::::ITEMS_PER_SEGMENT; + + // Testing with half full segment + let items = (0..MAX_ITEMS / 2).map(FilterHeader::new_test).collect(); + let mut segment = Segment::new(segment_id, items, SegmentState::Dirty); + + assert_eq!(segment.get(MAX_ITEMS..MAX_ITEMS + 1), []); + assert_eq!( + segment.get(0..MAX_ITEMS / 2), + &(0..MAX_ITEMS / 2).map(FilterHeader::new_test).collect::>() + ); + assert_eq!( + segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2), + [FilterHeader::new_test(MAX_ITEMS / 2 - 1)] + ); + assert_eq!(segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1), []); + assert_eq!(segment.get(MAX_ITEMS - 1..MAX_ITEMS), []); + + assert_eq!(segment.state, SegmentState::Dirty); + assert!(segment.persist(tmp_dir.path()).is_ok()); + assert_eq!(segment.state, SegmentState::Clean); + + let mut loaded_segment = + Segment::::load(tmp_dir.path(), segment_id).await.unwrap(); + + assert_eq!( + loaded_segment.get(MAX_ITEMS..MAX_ITEMS + 1), + segment.get(MAX_ITEMS..MAX_ITEMS + 1) + ); + assert_eq!(loaded_segment.get(0..1), segment.get(0..1)); + assert_eq!( + loaded_segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2), + segment.get(MAX_ITEMS / 2 - 1..MAX_ITEMS / 2) + ); + assert_eq!( + loaded_segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1), + segment.get(MAX_ITEMS / 2..MAX_ITEMS / 2 + 1) + ); + assert_eq!( + loaded_segment.get(MAX_ITEMS - 1..MAX_ITEMS), + segment.get(MAX_ITEMS - 1..MAX_ITEMS) + ); + } + + #[test] + fn test_segment_insert_get() { + let segment_id = 10; + + const MAX_ITEMS: u32 = Segment::::ITEMS_PER_SEGMENT; + + let items = (0..10).map(FilterHeader::new_test).collect(); + + let mut segment = Segment::new(segment_id, items, SegmentState::Dirty); + + assert_eq!(segment.items.len(), 10); + assert_eq!( + segment.get(0..MAX_ITEMS + 1), + &(0..10).map(FilterHeader::new_test).collect::>() + ); + + segment.insert(FilterHeader::new_test(4), 4); + segment.insert(FilterHeader::new_test(10), 10); + + assert_eq!(segment.items.len(), 11); + assert_eq!( + segment.get(0..MAX_ITEMS + 1), + &(0..11).map(FilterHeader::new_test).collect::>() + ); + } }