diff --git a/src/db/car/many.rs b/src/db/car/many.rs index 0012a86fb523..ec0cb31927b9 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -317,7 +317,7 @@ impl super::super::HeaviestTipsetKeyProvider for } impl BlockstoreWriteOpsSubscribable for ManyCar { - fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver)>> { + fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver> { self.writer().subscribe_write_ops() } diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 8d9e1d7097ca..a38f4745eb85 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -71,7 +71,7 @@ pub struct SnapshotGarbageCollector { blessed_lite_snapshot: RwLock>, chain_follower: Arc>, // On mainnet, it takes ~50MiB-200MiB RAM, depending on the time cost of snapshot export - memory_db: RwLock>>>, + memory_db: RwLock>>, memory_db_head_key: RwLock>, exported_head_key: RwLock>, trigger_tx: flume::Sender<()>, diff --git a/src/db/mod.rs b/src/db/mod.rs index 185823da54ef..66da86ed8280 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -196,7 +196,7 @@ pub trait HeaviestTipsetKeyProvider { #[auto_impl::auto_impl(&, Arc)] pub trait BlockstoreWriteOpsSubscribable { - fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver)>>; + fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver>; fn unsubscribe_write_ops(&self); } diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index e385c9b31471..d44cae1e55a2 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -83,7 +83,7 @@ impl DbColumn { } } -type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender)>>; +type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender>; pub struct ParityDb { pub db: parity_db::Db, @@ -242,7 +242,7 @@ impl Blockstore for ParityDb { self.write_to_column(k.to_bytes(), block, column)?; match &*self.write_ops_broadcast_tx.read() { Some(tx) if has_subscribers(tx) => { - let _ = tx.send(vec![(*k, block.to_vec())]); + let _ = tx.send(vec![(*k, bytes::Bytes::copy_from_slice(block))]); } _ => {} } @@ -263,7 +263,7 @@ impl Blockstore for ParityDb { let column = Self::choose_column(&k); let v = v.as_ref().to_vec(); if has_subscribers { - values_for_subscriber.push((k, v.clone())); + values_for_subscriber.push((k, bytes::Bytes::copy_from_slice(&v))); } (column, k.to_bytes(), v) }); @@ -372,7 +372,7 @@ impl ParityDb { } impl super::BlockstoreWriteOpsSubscribable for ParityDb { - fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver)>> { + fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver> { let tx_lock = self.write_ops_broadcast_tx.read(); if let Some(tx) = &*tx_lock { return tx.subscribe(); @@ -557,14 +557,9 @@ mod test { for (idx, cid) in cids.iter().enumerate() { let data_entry = &data[idx]; db.put_keyed(cid, data_entry).unwrap(); - assert_eq!( - rx1.blocking_recv().unwrap(), - vec![(*cid, data_entry.clone())] - ); - assert_eq!( - rx2.blocking_recv().unwrap(), - vec![(*cid, data_entry.clone())] - ); + let expected = vec![(*cid, bytes::Bytes::copy_from_slice(data_entry))]; + assert_eq!(rx1.blocking_recv().unwrap(), expected); + assert_eq!(rx2.blocking_recv().unwrap(), expected); } drop(rx1); diff --git a/src/db/parity_db/gc.rs b/src/db/parity_db/gc.rs index f50c6eb1a38a..a0d501f6ff9a 100644 --- a/src/db/parity_db/gc.rs +++ b/src/db/parity_db/gc.rs @@ -184,7 +184,7 @@ impl DBStatistics for GarbageCollectableParityDb { } impl BlockstoreWriteOpsSubscribable for GarbageCollectableParityDb { - fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver)>> { + fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver> { BlockstoreWriteOpsSubscribable::subscribe_write_ops(&*self.db.read()) }