Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cid_collections/hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl<V> CidHashMap<V> {
/// Gets the given key's corresponding entry in the map for in-place manipulation.
///
/// See also [`HashMap::entry`].
#[allow(dead_code)]
pub fn entry(&mut self, key: Cid) -> Entry<V> {
match MaybeCompactedCid::from(key) {
MaybeCompactedCid::Compact(c) => match self.compact.entry(c) {
Expand All @@ -133,6 +134,7 @@ impl<V> CidHashMap<V> {
/// A view into a single entry in a map, which may either be vacant or occupied.
///
/// This `enum` is constructed using [`CidHashMap::entry`].
#[allow(dead_code)]
#[derive(Debug)]
pub enum Entry<'a, V: 'a> {
/// An occupied entry.
Expand All @@ -145,6 +147,7 @@ pub enum Entry<'a, V: 'a> {
/// It is part of the [`Entry`] enum.
///
/// See also [`std::collections::hash_map::OccupiedEntry`].
#[allow(dead_code)]
#[derive(Debug)]
pub struct OccupiedEntry<'a, V> {
inner: OccupiedEntryInner<'a, V>,
Expand All @@ -154,6 +157,7 @@ impl<V> OccupiedEntry<'_, V> {
/// Gets a reference to the value in the entry.
///
/// See also [`std::collections::hash_map::OccupiedEntry::get`].
#[allow(dead_code)]
pub fn get(&self) -> &V {
match &self.inner {
OccupiedEntryInner::Compact(c) => c.get(),
Expand All @@ -163,6 +167,7 @@ impl<V> OccupiedEntry<'_, V> {
}

/// Hides compaction from users.
#[allow(dead_code)]
#[derive(Debug)]
enum OccupiedEntryInner<'a, V> {
Compact(StdOccupiedEntry<'a, CidV1DagCborBlake2b256, V>),
Expand All @@ -173,6 +178,7 @@ enum OccupiedEntryInner<'a, V> {
/// It is part of the [`Entry`] enum.
///
/// See also [`std::collections::hash_map::VacantEntry`].
#[allow(dead_code)]
#[derive(Debug)]
pub struct VacantEntry<'a, V> {
inner: VacantEntryInner<'a, V>,
Expand All @@ -183,6 +189,7 @@ impl<'a, V> VacantEntry<'a, V> {
/// and returns a mutable reference to it.
///
/// See also [`std::collections::hash_map::VacantEntry::insert`].
#[allow(dead_code)]
pub fn insert(self, value: V) -> &'a mut V {
match self.inner {
VacantEntryInner::Compact(c) => c.insert(value),
Expand All @@ -192,6 +199,7 @@ impl<'a, V> VacantEntry<'a, V> {
}

/// Hides compaction from users.
#[allow(dead_code)]
#[derive(Debug)]
enum VacantEntryInner<'a, V> {
Compact(StdVacantEntry<'a, CidV1DagCborBlake2b256, V>),
Expand Down
14 changes: 0 additions & 14 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use super::{CacheKey, RandomAccessFileReader, ZstdFrameCache};
use crate::blocks::{Tipset, TipsetKey};
use crate::db::PersistentStore;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -142,19 +141,6 @@ where
}
}

impl<ReaderT> PersistentStore for AnyCar<ReaderT>
where
ReaderT: ReadAt,
{
fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
match self {
AnyCar::Forest(forest) => forest.put_keyed_persistent(k, block),
AnyCar::Plain(plain) => plain.put_keyed_persistent(k, block),
AnyCar::Memory(mem) => mem.put_keyed_persistent(k, block),
}
}
}

impl<ReaderT> From<super::ForestCar<ReaderT>> for AnyCar<ReaderT> {
fn from(car: super::ForestCar<ReaderT>) -> Self {
Self::Forest(car)
Expand Down
34 changes: 4 additions & 30 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

use super::{CacheKey, ZstdFrameCache};
use crate::blocks::{Tipset, TipsetKey};
use crate::db::PersistentStore;
use crate::db::car::RandomAccessFileReader;
use crate::db::car::plain::write_skip_frame_header_async;
use crate::utils::db::car_stream::{CarBlock, CarV1Header};
Expand All @@ -62,7 +61,7 @@ use futures::{Stream, TryStream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use nunny::Vec as NonEmpty;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
Expand Down Expand Up @@ -96,7 +95,6 @@ pub struct ForestCar<ReaderT> {
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
index_size_bytes: u32,
frame_cache: Arc<Mutex<ZstdFrameCache>>,
write_cache: Arc<RwLock<ahash::HashMap<Cid, Vec<u8>>>>,
roots: NonEmpty<Cid>,
}

Expand All @@ -116,7 +114,6 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
indexed,
index_size_bytes,
frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
write_cache: Arc::new(RwLock::new(ahash::HashMap::default())),
roots: header.roots,
})
}
Expand Down Expand Up @@ -178,7 +175,6 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
}),
index_size_bytes: self.index_size_bytes,
frame_cache: self.frame_cache,
write_cache: self.write_cache,
roots: self.roots,
}
}
Expand All @@ -205,11 +201,6 @@ where
{
#[tracing::instrument(level = "trace", skip(self))]
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
// Return immediately if the value is cached.
if let Some(value) = self.write_cache.read().get(k) {
return Ok(Some(value.clone()));
}

let indexed = &self.indexed;
for position in indexed.get(*k)?.into_iter() {
let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k);
Expand Down Expand Up @@ -246,26 +237,9 @@ where
Ok(None)
}

#[tracing::instrument(level = "trace", skip(self, block))]
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
debug_assert!(
CarBlock {
cid: *k,
data: block.to_vec()
}
.valid()
);
self.write_cache.write().insert(*k, Vec::from(block));
Ok(())
}
}

impl<ReaderT> PersistentStore for ForestCar<ReaderT>
where
ReaderT: ReadAt,
{
fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
self.put_keyed(k, block)
/// Not supported, use [`super::ManyCar`] instead.
fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
unreachable!("ForestCar is read-only, use ManyCar instead");
}
}

Expand Down
72 changes: 8 additions & 64 deletions src/db/car/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,20 @@
//! - CARv2 support
//! - A wrapper that abstracts over car formats for reading.

use crate::cid_collections::{CidHashMap, hash_map::Entry as CidHashMapEntry};
use crate::cid_collections::CidHashMap;
use crate::db::PersistentStore;
use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
use crate::{
blocks::{Tipset, TipsetKey},
utils::encoding::from_slice_with_fallback,
};
use CidHashMapEntry::{Occupied, Vacant};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use integer_encoding::{FixedIntReader, VarIntReader};
use nunny::Vec as NonEmpty;
use parking_lot::RwLock;
use positioned_io::ReadAt;
use std::ops::DerefMut;
use std::{
any::Any,
io::{
self, BufReader,
ErrorKind::{InvalidData, Unsupported},
Expand All @@ -101,7 +98,7 @@ use tracing::{debug, trace};
///
/// When a block is requested, [`PlainCar`] scrolls to that offset, and reads the block, on-demand.
///
/// Writes for new blocks (which don't exist in the CAR already) are currently cached in-memory.
/// Writes for new blocks (which don't exist in the CAR already) are not supported.
///
/// Random-access performance is expected to be poor, as the OS will have to load separate parts of
/// the file from disk, and flush it for each read. However, (near) linear access should be pretty
Expand All @@ -110,7 +107,6 @@ use tracing::{debug, trace};
/// See [module documentation](mod@self) for more.
pub struct PlainCar<ReaderT> {
reader: ReaderT,
write_cache: RwLock<CidHashMap<Vec<u8>>>,
index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
version: u64,
header_v1: CarV1Header,
Expand Down Expand Up @@ -162,7 +158,6 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
debug!(num_blocks, "indexed CAR");
Ok(Self {
reader,
write_cache: RwLock::new(CidHashMap::new()),
index: RwLock::new(index),
version,
header_v1,
Expand Down Expand Up @@ -197,7 +192,6 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
PlainCar {
reader: Box::new(self.reader),
write_cache: self.write_cache,
index: self.index,
version: self.version,
header_v1: self.header_v1,
Expand Down Expand Up @@ -227,39 +221,23 @@ where
{
#[tracing::instrument(level = "trace", skip(self))]
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
match (self.index.read().get(k), self.write_cache.read().get(k)) {
(Some(_location), Some(_cached)) => {
trace!("evicting from write cache");
Ok(self.write_cache.write().remove(k))
}
(Some(UncompressedBlockDataLocation { offset, length }), None) => {
match self.index.read().get(k) {
Some(UncompressedBlockDataLocation { offset, length }) => {
trace!("fetching from disk");
let mut data = vec![0; usize::try_from(*length).unwrap()];
self.reader.read_exact_at(*offset, &mut data)?;
Ok(Some(data))
}
(None, Some(cached)) => {
trace!("getting from write cache");
Ok(Some(cached.clone()))
}
(None, None) => {
None => {
trace!("not found");
Ok(None)
}
}
}

/// # Panics
/// - If the write cache already contains different data with this CID
/// - See also [`Self::new`].
///
/// Note: Locks have to be acquired in exactly the same order as in `get`, otherwise a
/// deadlock is imminent in a multi-threaded context.
#[tracing::instrument(level = "trace", skip(self, block))]
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
let mut index = self.index.write();
let mut cache = self.write_cache.write();
handle_write_cache(cache.deref_mut(), index.deref_mut(), k, block)
/// Not supported, use [`super::ManyCar`] instead.
fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
unreachable!("PlainCar is read-only, use ManyCar instead");
}
}

Expand Down Expand Up @@ -289,40 +267,6 @@ pub struct CompressedBlockDataLocation {
pub location_in_frame: UncompressedBlockDataLocation,
}

/// # Panics
/// - If the write cache already contains different data with this CID
///
/// Note: This could potentially be enhanced with fine-grained read/write
/// locking, however the performance is acceptable for now.
fn handle_write_cache(
write_cache: &mut CidHashMap<Vec<u8>>,
index: &mut CidHashMap<impl Any>,
k: &Cid,
block: &[u8],
) -> anyhow::Result<()> {
match (index.get(k), write_cache.entry(*k)) {
(None, Occupied(already)) => match already.get() == block {
true => {
trace!("already in cache");
Ok(())
}
false => panic!("mismatched content on second write for CID {k}"),
},
(None, Vacant(vacant)) => {
trace!(bytes = block.len(), "insert into cache");
vacant.insert(block.to_owned());
Ok(())
}
(Some(_), Vacant(_)) => {
trace!("already on disk");
Ok(())
}
(Some(_), Occupied(_)) => {
unreachable!("we don't insert a CID in the write cache if it exists on disk")
}
}
}

fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
match cid_error {
cid::Error::Io(io_error) => io_error,
Expand Down
11 changes: 8 additions & 3 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,10 @@ mod tests {

use crate::{
blocks::{Chain4U, RawBlockHeader, chain4u},
db::{MemoryDB, car::PlainCar},
db::{
MemoryDB,
car::{AnyCar, ManyCar},
},
networks::{self, ChainConfig},
};

Expand Down Expand Up @@ -1067,10 +1070,12 @@ mod tests {
let _ = (a, c1);
}

impl ChainStore<Chain4U<PlainCar<&'static [u8]>>> {
impl ChainStore<Chain4U<ManyCar>> {
fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
let db = Arc::new(Chain4U::with_blockstore(
PlainCar::new(genesis_car).unwrap(),
ManyCar::new(MemoryDB::default())
.with_read_only(AnyCar::new(genesis_car).unwrap())
.unwrap(),
));
let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
ChainStore::new(
Expand Down
5 changes: 3 additions & 2 deletions src/tool/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::blocks::Tipset;
use crate::chain::index::{ChainIndex, ResolveNullTipset};
use crate::cli_shared::snapshot;
use crate::daemon::bundle::load_actor_bundles;
use crate::db::PersistentStore;
use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
use crate::db::car::{AnyCar, ManyCar};
use crate::db::{MemoryDB, PersistentStore};
use crate::interpreter::{MessageCallbackCtx, VMTrace};
use crate::ipld::stream_chain;
use crate::networks::{ChainConfig, NetworkChain, butterflynet, calibnet, mainnet};
Expand Down Expand Up @@ -172,7 +172,8 @@ impl SnapshotCommands {
for file in snapshot_files {
println!("Validating {}", file.display());
let result = async {
let store = AnyCar::try_from(file.as_path())?;
let store = ManyCar::new(MemoryDB::default())
.with_read_only(AnyCar::try_from(file.as_path())?)?;
validate_with_blockstore(
store.heaviest_tipset()?,
Arc::new(store),
Expand Down
Loading