diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index a2ddf345c128..243fd3e2d6a3 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -21,7 +21,7 @@ use crate::utils::get_size::CidWrapper; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use cid::Cid; use fvm_ipld_encoding::to_vec; -use parking_lot::{Mutex, RwLock as SyncRwLock}; +use parking_lot::RwLock as SyncRwLock; use tracing::error; use utils::{get_base_fee_lower_bound, recover_sig}; @@ -57,7 +57,7 @@ async fn republish_pending_messages( api: &T, network_sender: &flume::Sender, pending: &SyncRwLock>, - cur_tipset: &Mutex>, + cur_tipset: &SyncRwLock>, republished: &SyncRwLock>, local_addrs: &SyncRwLock>, chain_config: &ChainConfig, @@ -65,7 +65,7 @@ async fn republish_pending_messages( where T: Provider, { - let ts = cur_tipset.lock().clone(); + let ts = cur_tipset.read().clone(); let mut pending_map: HashMap> = HashMap::new(); republished.write().clear(); @@ -216,7 +216,7 @@ pub async fn head_change( repub_trigger: Arc>, republished: &SyncRwLock>, pending: &SyncRwLock>, - cur_tipset: &Mutex>, + cur_tipset: &SyncRwLock>, revert: Vec, apply: Vec, ) -> Result<(), Error> @@ -227,7 +227,7 @@ where let mut rmsgs: HashMap> = HashMap::new(); for ts in revert { let pts = api.load_tipset(ts.parents())?; - *cur_tipset.lock() = pts; + *cur_tipset.write() = pts; let mut msgs: Vec = Vec::new(); for block in ts.block_headers() { @@ -266,7 +266,7 @@ where } } } - *cur_tipset.lock() = Arc::new(ts); + *cur_tipset.write() = Arc::new(ts); } if repub { repub_trigger @@ -276,7 +276,7 @@ where } for (_, hm) in rmsgs { for (_, msg) in hm { - let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.lock().clone())?; + let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?; if let Err(e) = add_helper(api, bls_sig_cache, pending, msg, sequence) { error!("Failed to read message from reorg to mpool: {}", e); } @@ -616,7 +616,7 @@ pub mod tests { // sleep allows for async block to update mpool's cur_tipset tokio::time::sleep(Duration::new(2, 0)).await; - let cur_ts = mpool.cur_tipset.lock().clone(); + let cur_ts = mpool.current_tipset(); assert_eq!(cur_ts.as_ref(), &tipset); } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 3c0d70a361ec..da7f70c4f670 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -32,7 +32,7 @@ use futures::StreamExt; use fvm_ipld_encoding::to_vec; use itertools::Itertools; use nonzero_ext::nonzero; -use parking_lot::{Mutex, RwLock as SyncRwLock}; +use parking_lot::RwLock as SyncRwLock; use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval}; use tracing::warn; @@ -177,7 +177,7 @@ pub struct MessagePool { /// A map of pending messages where the key is the address pub pending: Arc>>, /// The current tipset (a set of blocks) - pub cur_tipset: Arc>>, + pub cur_tipset: Arc>>, /// The underlying provider pub api: Arc, /// Sender half to send messages to other components @@ -202,6 +202,11 @@ impl MessagePool where T: Provider, { + /// Gets the current tipset + pub fn current_tipset(&self) -> Arc { + self.cur_tipset.read().clone() + } + /// Add a signed message to the pool and its address. fn add_local(&self, m: SignedMessage) -> Result<(), Error> { self.local_addrs.write().push(m.from()); @@ -214,7 +219,7 @@ where pub async fn push(&self, msg: SignedMessage) -> Result { self.check_message(&msg)?; let cid = msg.cid(); - let cur_ts = self.cur_tipset.lock().clone(); + let cur_ts = self.current_tipset(); let publish = self.add_tipset(msg.clone(), &cur_ts, true)?; let msg_ser = to_vec(&msg)?; let network_name = self.chain_config.network.genesis_name(); @@ -249,10 +254,8 @@ where /// fits the parameters to be pushed to the `MessagePool`. pub fn add(&self, msg: SignedMessage) -> Result<(), Error> { self.check_message(&msg)?; - - let tip = self.cur_tipset.lock().clone(); - - self.add_tipset(msg, &tip, false)?; + let ts = self.current_tipset(); + self.add_tipset(msg, &ts, false)?; Ok(()) } @@ -320,7 +323,7 @@ where /// the pending hash-map. fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> { let from = msg.from(); - let cur_ts = self.cur_tipset.lock().clone(); + let cur_ts = self.current_tipset(); add_helper( self.api.as_ref(), self.bls_sig_cache.as_ref(), @@ -333,7 +336,7 @@ where /// Get the sequence for a given address, return Error if there is a failure /// to retrieve the respective sequence. pub fn get_sequence(&self, addr: &Address) -> Result { - let cur_ts = self.cur_tipset.lock().clone(); + let cur_ts = self.current_tipset(); let sequence = self.get_state_sequence(addr, &cur_ts)?; @@ -378,7 +381,7 @@ where ) } - let cur_ts = self.cur_tipset.lock().clone(); + let cur_ts = self.current_tipset(); Ok((out, cur_ts)) } @@ -471,7 +474,7 @@ where { let local_addrs = Arc::new(SyncRwLock::new(Vec::new())); let pending = Arc::new(SyncRwLock::new(HashMap::new())); - let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset())); + let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry( "bls_sig_cache".into(), BLS_SIG_CACHE_SIZE, diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index 42dfb93c34e9..0910254312b8 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -290,7 +290,7 @@ where /// for inclusion from the pool, given the ticket quality of a miner. /// This method selects messages for including in a block. pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result, Error> { - let cur_ts = self.cur_tipset.lock().clone(); + let cur_ts = self.current_tipset(); // if the ticket quality is high enough that the first block has higher // probability than any other block, then we don't bother with optimal // selection because the first block will always have higher effective diff --git a/src/rpc/methods/gas.rs b/src/rpc/methods/gas.rs index 174e19e760a0..341d31898948 100644 --- a/src/rpc/methods/gas.rs +++ b/src/rpc/methods/gas.rs @@ -221,7 +221,7 @@ impl GasEstimateGasLimit { .map(|s| s.into_iter().map(ChainMessage::Signed).collect::>()) .unwrap_or_default(); - let ts = data.mpool.cur_tipset.lock().clone(); + let ts = data.mpool.current_tipset(); // Pretend that the message is signed. This has an influence on the gas // cost. We obviously can't generate a valid signature. Instead, we just // fill the signature with zeros. The validity is not checked.