From 7e66a9e7b8f6d7e75325f7de06c522407e70d665 Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Tue, 23 Jul 2024 16:50:34 +0530 Subject: [PATCH 1/7] Impl EthSubscribe and EthUnsubscribe --- src/rpc/methods/chain.rs | 22 +++++++++ src/rpc/mod.rs | 103 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index df3ec254be92..da7d06cce3bb 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -669,6 +669,25 @@ pub(crate) fn chain_notify( receiver } +pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subscriber { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.chain_store().publisher().subscribe(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let headers = match v { + HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), + }; + if sender.send(headers).is_err() { + break; + } + } + }); + + receiver +} + fn load_api_messages_from_tipset( store: &impl Blockstore, tipset: &Tipset, @@ -773,6 +792,9 @@ pub struct ApiHeadChange { } lotus_json_with_self!(ApiHeadChange); +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec); + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "Type", content = "Val", rename_all = "snake_case")] pub enum PathChange> { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index af1c4a5a12d5..32270e074f0c 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -9,6 +9,7 @@ mod request; pub use client::Client; pub use error::ServerError; use futures::FutureExt as _; +use methods::chain::new_heads; use reflect::Ctx; pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -300,16 +301,21 @@ use crate::rpc::channel::RpcModule as FilRpcModule; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::blocks::Tipset; +use ethereum_types::H256; use fvm_ipld_blockstore::Blockstore; use jsonrpsee::{ + core::traits::IdProvider, server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder}, + types::SubscriptionId, Methods, }; use once_cell::sync::Lazy; +use rand::Rng; use std::env; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::{mpsc, RwLock}; use tower::Service; @@ -328,6 +334,10 @@ static DEFAULT_REQUEST_TIMEOUT: Lazy = Lazy::new(|| { const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024; const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; +const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; +const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; +const ETH_SUBSCRIPTION: &str = "eth_subscription"; + /// This is where you store persistent data, or at least access to stateful /// data. pub struct RPCState { @@ -377,6 +387,25 @@ struct PerConnection { keystore: Arc>, } +#[derive(Debug, Copy, Clone)] +pub struct RandomHexStringIdProvider {} + +impl RandomHexStringIdProvider { + pub fn new() -> Self { + Self {} + } +} + +impl IdProvider for RandomHexStringIdProvider { + fn next_id(&self) -> SubscriptionId<'static> { + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + rng.fill(&mut bytes); + + SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into()) + } +} + pub async fn start_rpc(state: RPCState, rpc_endpoint: SocketAddr) -> anyhow::Result<()> where DB: Blockstore + Send + Sync + 'static, @@ -403,6 +432,7 @@ where // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` .max_request_body_size(MAX_REQUEST_BODY_SIZE) .max_response_body_size(MAX_RESPONSE_BODY_SIZE) + .set_id_provider(RandomHexStringIdProvider::new()) .to_service_builder(), keystore, }; @@ -504,6 +534,79 @@ where }; } for_each_method!(register); + + module.register_subscription( + ETH_SUBSCRIBE, + ETH_SUBSCRIPTION, + ETH_UNSUBSCRIBE, + |params, pending, ctx, _| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + + tracing::trace!("Subscribing to events: {:?}", event_types); + + let mut receiver = new_heads(&ctx); + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::from_json(&v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) + }, + ).unwrap(); + module } From 3f5257119e7c1c4369dd2c2b7ed189816d7735f2 Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Tue, 23 Jul 2024 17:19:32 +0530 Subject: [PATCH 2/7] register subscription in a new module --- src/rpc/mod.rs | 148 +++++++++++++++++++++++++------------------------ 1 file changed, 76 insertions(+), 72 deletions(-) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 32270e074f0c..3c144db273b6 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -423,6 +423,82 @@ where })?; module.merge(pubsub_module)?; + let mut sub_module = RpcModule::from_arc(state); + + sub_module.register_subscription( + ETH_SUBSCRIBE, + ETH_SUBSCRIPTION, + ETH_UNSUBSCRIBE, + |params, pending, ctx, _| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + + tracing::trace!("Subscribing to events: {:?}", event_types); + + let mut receiver = new_heads(&ctx); + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::from_json(&v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) + }, + ).unwrap(); + + module.merge(sub_module)?; + let (stop_handle, _server_handle) = stop_channel(); let per_conn = PerConnection { @@ -535,78 +611,6 @@ where } for_each_method!(register); - module.register_subscription( - ETH_SUBSCRIBE, - ETH_SUBSCRIPTION, - ETH_UNSUBSCRIBE, - |params, pending, ctx, _| async move { - let event_types = match params.parse::>() { - Ok(v) => v, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - // `event_types` is one OR more of: - // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool - // - "logs": notify new event logs that match a criteria - - tracing::trace!("Subscribing to events: {:?}", event_types); - - let mut receiver = new_heads(&ctx); - tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - - tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); - - loop { - tokio::select! { - action = receiver.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::from_json(&v) { - Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - } - } - } - _ = sink.closed() => { - break; - } - } - } - - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); - }); - - Ok(()) - }, - ).unwrap(); - module } From f7fe7a6434c2439864a5300d9fdde1bd06f7bb15 Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Tue, 23 Jul 2024 18:07:34 +0530 Subject: [PATCH 3/7] give access --- src/rpc/auth_layer.rs | 4 +++- src/rpc/methods/eth.rs | 3 +++ src/rpc/mod.rs | 12 +++--------- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index d18b3218443c..4fbff3020f41 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -3,7 +3,7 @@ use crate::auth::{verify_token, JWT_IDENTIFIER}; use crate::key_management::KeyStore; -use crate::rpc::{chain, Permission, RpcMethod as _, CANCEL_METHOD_NAME}; +use crate::rpc::{chain, eth, Permission, RpcMethod as _, CANCEL_METHOD_NAME}; use ahash::{HashMap, HashMapExt as _}; use futures::future::BoxFuture; use futures::FutureExt; @@ -31,6 +31,8 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: super::for_each_method!(insert); access.insert(chain::CHAIN_NOTIFY, Permission::Read); + access.insert(eth::ETH_SUBSCRIBE, Permission::Read); + access.insert(eth::ETH_UNSUBSCRIBE, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); access diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d4a5892a55c7..49b02baa2a2f 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -46,6 +46,9 @@ use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::{ops::Add, sync::Arc}; +pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; +pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; + const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; /// Ethereum Bloom filter size in bits. diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 3c144db273b6..6c3ca7d79fe3 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -334,8 +334,6 @@ static DEFAULT_REQUEST_TIMEOUT: Lazy = Lazy::new(|| { const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024; const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; -const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; -const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; const ETH_SUBSCRIPTION: &str = "eth_subscription"; /// This is where you store persistent data, or at least access to stateful @@ -423,12 +421,10 @@ where })?; module.merge(pubsub_module)?; - let mut sub_module = RpcModule::from_arc(state); - - sub_module.register_subscription( - ETH_SUBSCRIBE, + module.register_subscription( + eth::ETH_SUBSCRIBE, ETH_SUBSCRIPTION, - ETH_UNSUBSCRIBE, + eth::ETH_UNSUBSCRIBE, |params, pending, ctx, _| async move { let event_types = match params.parse::>() { Ok(v) => v, @@ -497,8 +493,6 @@ where }, ).unwrap(); - module.merge(sub_module)?; - let (stop_handle, _server_handle) = stop_channel(); let per_conn = PerConnection { From 4acecde0d2ccc28bd6e39bf7357e50f61e5be318 Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Tue, 23 Jul 2024 18:20:51 +0530 Subject: [PATCH 4/7] give access to ETH_SUBSCRIPTION --- src/rpc/auth_layer.rs | 1 + src/rpc/methods/eth.rs | 1 + src/rpc/mod.rs | 4 +--- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index 4fbff3020f41..f5ab416f1d05 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -32,6 +32,7 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: access.insert(chain::CHAIN_NOTIFY, Permission::Read); access.insert(eth::ETH_SUBSCRIBE, Permission::Read); + access.insert(eth::ETH_SUBSCRIPTION, Permission::Read); access.insert(eth::ETH_UNSUBSCRIBE, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 49b02baa2a2f..7ebd19d7c7c5 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -47,6 +47,7 @@ use std::str::FromStr; use std::{ops::Add, sync::Arc}; pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; +pub const ETH_SUBSCRIPTION: &str = "Filecoin.EthSubscription"; pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 6c3ca7d79fe3..483f49502352 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -334,8 +334,6 @@ static DEFAULT_REQUEST_TIMEOUT: Lazy = Lazy::new(|| { const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024; const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; -const ETH_SUBSCRIPTION: &str = "eth_subscription"; - /// This is where you store persistent data, or at least access to stateful /// data. pub struct RPCState { @@ -423,7 +421,7 @@ where module.register_subscription( eth::ETH_SUBSCRIBE, - ETH_SUBSCRIPTION, + eth::ETH_SUBSCRIPTION, eth::ETH_UNSUBSCRIBE, |params, pending, ctx, _| async move { let event_types = match params.parse::>() { From 0147e4d80fd10a3b4456ca759eb888b80b6c6d20 Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Wed, 24 Jul 2024 14:08:16 +0530 Subject: [PATCH 5/7] impl pending txn sub --- src/rpc/methods/chain.rs | 24 ++++++ src/rpc/mod.rs | 161 ++++++++++++++++++++++----------------- 2 files changed, 115 insertions(+), 70 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index da7d06cce3bb..5706e0956364 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -688,6 +688,30 @@ pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subs receiver } +pub(crate) fn pending_txn( + data: Arc>, +) -> Subscriber> { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.chain_store().publisher().subscribe(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let messages = match v { + HeadChange::Apply(ts) => { + load_api_messages_from_tipset(Arc::clone(&data).store(), &ts).unwrap() + } + }; + + if sender.send(messages).is_err() { + break; + } + } + }); + + receiver +} + fn load_api_messages_from_tipset( store: &impl Blockstore, tipset: &Tipset, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 483f49502352..91ad8f4b5776 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -9,7 +9,7 @@ mod request; pub use client::Client; pub use error::ServerError; use futures::FutureExt as _; -use methods::chain::new_heads; +use methods::chain::{new_heads, pending_txn}; use reflect::Ctx; pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -18,6 +18,7 @@ mod reflect; pub mod types; pub use methods::*; use reflect::Permission; +use tokio::sync::broadcast::Receiver; /// Protocol or transport-specific error pub use jsonrpsee::core::ClientError; @@ -402,6 +403,45 @@ impl IdProvider for RandomHexStringIdProvider { } } +enum ReceiverType { + Heads(Receiver), + Txn(Receiver>), +} + +async fn handle_subscription(mut rx: Receiver, sink: jsonrpsee::SubscriptionSink) +where + T: serde::Serialize + Clone, +{ + loop { + let action = rx.recv().await; + + tokio::select! { + action = async { action } => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::from_json(&v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => {}, + } + } + _ = sink.closed() => break, + } + } + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); +} + pub async fn start_rpc(state: RPCState, rpc_endpoint: SocketAddr) -> anyhow::Result<()> where DB: Blockstore + Send + Sync + 'static, @@ -419,77 +459,58 @@ where })?; module.merge(pubsub_module)?; - module.register_subscription( - eth::ETH_SUBSCRIBE, - eth::ETH_SUBSCRIPTION, - eth::ETH_UNSUBSCRIBE, - |params, pending, ctx, _| async move { - let event_types = match params.parse::>() { - Ok(v) => v, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - // `event_types` is one OR more of: - // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool - // - "logs": notify new event logs that match a criteria - - tracing::trace!("Subscribing to events: {:?}", event_types); - - let mut receiver = new_heads(&ctx); - tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - - tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); - - loop { - tokio::select! { - action = receiver.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::from_json(&v) { - Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - } - } - } - _ = sink.closed() => { - break; - } + module + .register_subscription( + eth::ETH_SUBSCRIBE, + eth::ETH_SUBSCRIPTION, + eth::ETH_UNSUBSCRIBE, + |params, pending, ctx, _| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); } - } - - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); - }); + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + + tracing::trace!("Subscribing to events: {:?}", event_types); + + let receiver = event_types + .iter() + .find_map(|event| match event.as_str() { + "newHeads" => Some(ReceiverType::Heads(new_heads(&ctx))), + "pendingTransactions" => Some(ReceiverType::Txn(pending_txn(ctx.clone()))), + _ => None, + }) + .expect("No valid event type found"); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + match receiver { + ReceiverType::Heads(rx) => handle_subscription(rx, sink).await, + ReceiverType::Txn(rx) => handle_subscription(rx, sink).await, + } + }); - Ok(()) - }, - ).unwrap(); + Ok(()) + }, + ) + .unwrap(); let (stop_handle, _server_handle) = stop_channel(); From 6e29f5ce434367b0e3547d3dbd795f2a941a18ce Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Wed, 24 Jul 2024 15:19:32 +0530 Subject: [PATCH 6/7] fix pending txn --- src/rpc/methods/chain.rs | 10 ++++++---- src/rpc/mod.rs | 8 +++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 5706e0956364..12a9385b9b41 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -15,6 +15,7 @@ use crate::ipld::DfsIter; use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::lotus_json::{lotus_json_with_self, HasLotusJson, LotusJson}; use crate::message::{ChainMessage, SignedMessage}; +use crate::message_pool::Provider; use crate::rpc::types::ApiTipsetKey; use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; @@ -690,16 +691,17 @@ pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subs pub(crate) fn pending_txn( data: Arc>, -) -> Subscriber> { +) -> Subscriber> { let (sender, receiver) = broadcast::channel(100); - let mut subscriber = data.chain_store().publisher().subscribe(); + let mut subscriber = data.mpool.api.subscribe_head_changes(); tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { let messages = match v { - HeadChange::Apply(ts) => { - load_api_messages_from_tipset(Arc::clone(&data).store(), &ts).unwrap() + HeadChange::Apply(_ts) => { + let (pending, _mpts) = data.mpool.pending().unwrap(); + pending } }; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 91ad8f4b5776..a682f324e32d 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -405,7 +405,7 @@ impl IdProvider for RandomHexStringIdProvider { enum ReceiverType { Heads(Receiver), - Txn(Receiver>), + Txn(Receiver>), } async fn handle_subscription(mut rx: Receiver, sink: jsonrpsee::SubscriptionSink) @@ -480,7 +480,7 @@ where }; // `event_types` is one OR more of: // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool + // - "newPendingTransactions": notify when new messages arrive in the message pool // - "logs": notify new event logs that match a criteria tracing::trace!("Subscribing to events: {:?}", event_types); @@ -489,7 +489,9 @@ where .iter() .find_map(|event| match event.as_str() { "newHeads" => Some(ReceiverType::Heads(new_heads(&ctx))), - "pendingTransactions" => Some(ReceiverType::Txn(pending_txn(ctx.clone()))), + "newPendingTransactions" => { + Some(ReceiverType::Txn(pending_txn(ctx.clone()))) + } _ => None, }) .expect("No valid event type found"); From 64ae4d27a317e41f6ddb84041b6ad5d9a995010e Mon Sep 17 00:00:00 2001 From: sudo-shashank Date: Wed, 24 Jul 2024 16:19:06 +0530 Subject: [PATCH 7/7] fix newPendingTransactions --- src/message_pool/msgpool/msg_pool.rs | 2 +- src/rpc/methods/chain.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 22b8029ff8c3..cd61d8c8647c 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -188,7 +188,7 @@ pub struct MessagePool { /// Acts as a signal to republish messages from the republished set of /// messages pub repub_trigger: flume::Sender<()>, - local_msgs: Arc>>, + pub local_msgs: Arc>>, /// Configurable parameters of the message pool pub config: MpoolConfig, /// Chain configuration diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 12a9385b9b41..6156ca1ba7cc 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -700,7 +700,8 @@ pub(crate) fn pending_txn( while let Ok(v) = subscriber.recv().await { let messages = match v { HeadChange::Apply(_ts) => { - let (pending, _mpts) = data.mpool.pending().unwrap(); + let local_msgs = data.mpool.local_msgs.write(); + let pending = local_msgs.iter().cloned().collect::>(); pending } };