diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 8aba9c54f..1a818b01e 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -21,7 +21,6 @@ use crate::Membership; use crate::RaftTypeConfig; use crate::StorageError; use crate::async_runtime::MpscReceiver; -use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; use crate::async_runtime::TryRecvError; use crate::async_runtime::watch::WatchSender; @@ -92,9 +91,10 @@ use crate::raft_state::io_state::io_id::IOId; use crate::raft_state::io_state::log_io_id::LogIOId; use crate::replication::ReplicationCore; use crate::replication::ReplicationSessionId; +use crate::replication::event_watcher::EventWatcher; use crate::replication::replication_context::ReplicationContext; use crate::replication::replication_handle::ReplicationHandle; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::replication::snapshot_transmitter::SnapshotTransmitter; use crate::runtime::RaftRuntime; use crate::storage::IOFlushed; @@ -106,6 +106,7 @@ use crate::type_config::alias::MpscReceiverOf; use crate::type_config::alias::MpscSenderOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::VoteOf; +use crate::type_config::alias::WatchReceiverOf; use crate::type_config::alias::WatchSenderOf; use crate::type_config::alias::WriteResponderOf; use crate::type_config::async_runtime::mpsc::MpscSender; @@ -197,6 +198,10 @@ where /// This is used by IOFlushed callbacks to report IO completion in a synchronous manner. pub(crate) tx_io_completed: WatchSenderOf, StorageError>>, + /// For broadcast committed log id to replication task. + pub(crate) committed_tx: WatchSenderOf>>, + pub(crate) _committed_rx: WatchReceiverOf>>, + pub(crate) tx_metrics: WatchSenderOf>, pub(crate) tx_data_metrics: WatchSenderOf>, pub(crate) tx_server_metrics: WatchSenderOf>, @@ -867,6 +872,8 @@ where let session_id = ReplicationSessionId::new(leader.committed_vote.clone(), membership_log_id.clone()); + let (event_watcher, entries_tx) = self.new_event_watcher(); + ReplicationCore::::spawn( target.clone(), session_id, @@ -876,10 +883,22 @@ where network, self.log_store.get_log_reader().await, self.tx_notification.clone(), + event_watcher, + entries_tx, tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)), ) } + fn new_event_watcher(&self) -> (EventWatcher, WatchSenderOf>) { + let (entries_tx, entries_rx) = C::watch_channel(Data::default()); + let ew = EventWatcher { + entries_rx, + committed_rx: self.committed_tx.subscribe(), + }; + + (ew, entries_tx) + } + /// Remove all replication. #[tracing::instrument(level = "debug", skip_all)] pub async fn remove_all_replication(&mut self) { @@ -898,7 +917,8 @@ where let handle = s.join_handle; // Drop sender to notify the task to shutdown - drop(s.tx_repl); + drop(s.entries_tx); + drop(s.cancel_tx); tracing::debug!("joining removed replication: {}", target); let _x = handle.await; @@ -1934,11 +1954,7 @@ where self.spawn_parallel_vote_requests(&vote_req).await; } Command::ReplicateCommitted { committed } => { - for node in self.replications.values() { - let _ = node.tx_repl.send(Replicate::Committed { - committed: committed.clone(), - }); - } + self.committed_tx.send_if_greater(committed); } Command::BroadcastHeartbeat { session_id } => { self.broadcast_heartbeat(session_id); @@ -1956,7 +1972,7 @@ where } Command::Replicate { req, target } => { let node = self.replications.get(&target).expect("replication to target node exists"); - let _ = node.tx_repl.send(req); + let _ = node.entries_tx.send(req); } Command::ReplicateSnapshot { target, inflight_id } => { let node = self.replications.get(&target).expect("replication to target node exists"); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 64fce599b..9c32ff607 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -22,7 +22,7 @@ use crate::raft::message::TransferLeaderRequest; use crate::raft_state::IOId; use crate::raft_state::IOState; use crate::replication::ReplicationSessionId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::VoteOf; @@ -94,7 +94,7 @@ where C: RaftTypeConfig }, /// Replicate log entries to a target. - Replicate { target: C::NodeId, req: Replicate }, + Replicate { target: C::NodeId, req: Data }, /// Replicate snapshot to a target. ReplicateSnapshot { target: C::NodeId, inflight_id: InflightId }, diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 8a87cdcdc..c5716e213 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -25,7 +25,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::inflight_id::InflightId; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::testing::blank_ent; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -151,11 +151,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)), + req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)), }, Command::Replicate { target: 3, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(2)), + req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(2)), }, ], eng.output.take_commands() @@ -280,7 +280,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)) + req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)) }, ], eng.output.take_commands() diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 7366c0a79..ab05ac078 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -23,7 +23,7 @@ use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::raft_state::LogStateReader; use crate::raft_state::io_state::log_io_id::LogIOId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; @@ -348,7 +348,7 @@ where C: RaftTypeConfig log_id_range, inflight_id, } => { - let req = Replicate::logs(log_id_range.clone(), *inflight_id); + let req = Data::new_logs(log_id_range.clone(), *inflight_id); output.push_command(Command::Replicate { target: target.clone(), req, diff --git a/openraft/src/engine/handler/vote_handler/become_leader_test.rs b/openraft/src/engine/handler/vote_handler/become_leader_test.rs index cbb10e88a..fcfb7cb58 100644 --- a/openraft/src/engine/handler/vote_handler/become_leader_test.rs +++ b/openraft/src/engine/handler/vote_handler/become_leader_test.rs @@ -18,7 +18,7 @@ use crate::log_id_range::LogIdRange; use crate::progress::entry::ProgressEntry; use crate::progress::inflight_id::InflightId; use crate::raft_state::IOId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::type_config::TypeConfigExt; use crate::type_config::alias::EntryOf; use crate::utime::Leased; @@ -73,7 +73,7 @@ fn test_become_leader() -> anyhow::Result<()> { }, Command::Replicate { target: 0, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), InflightId::new(1)) + req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), InflightId::new(1)) } ]); diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 1b5be3bea..a09a9a6c3 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -22,7 +22,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::inflight_id::InflightId; use crate::raft::VoteResponse; use crate::raft_state::IOId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::type_config::TypeConfigExt; use crate::utime::Leased; use crate::vote::raft_vote::RaftVoteExt; @@ -219,7 +219,7 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> { }, Command::Replicate { target: 2, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), InflightId::new(1)) + req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), InflightId::new(1)) }, ], eng.output.take_commands() diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 746f741c7..198f804ea 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -21,7 +21,7 @@ use crate::progress::Inflight; use crate::progress::entry::ProgressEntry; use crate::progress::inflight_id::InflightId; use crate::raft_state::IOId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::type_config::TypeConfigExt; use crate::utime::Leased; use crate::vote::raft_vote::RaftVoteExt; @@ -89,7 +89,7 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { }, Command::Replicate { target: 3, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), InflightId::new(1)), + req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), InflightId::new(1)), } ], eng.output.take_commands() @@ -137,7 +137,7 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { }, Command::Replicate { target: 3, - req: Replicate::logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), InflightId::new(1)) + req: Data::new_logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), InflightId::new(1)) } ], eng.output.take_commands() diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 5da8464b4..bc6c62bf8 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -450,6 +450,8 @@ where C: RaftTypeConfig sm_span, ); + let (committed_tx, committed_rx) = C::watch_channel(None); + let core: RaftCore = RaftCore { id: id.clone(), config: config.clone(), @@ -475,6 +477,8 @@ where C: RaftTypeConfig tx_io_completed, + committed_tx, + _committed_rx: committed_rx, tx_metrics, tx_data_metrics, tx_server_metrics, diff --git a/openraft/src/replication/event_watcher.rs b/openraft/src/replication/event_watcher.rs new file mode 100644 index 000000000..43b21afe5 --- /dev/null +++ b/openraft/src/replication/event_watcher.rs @@ -0,0 +1,41 @@ +use futures::FutureExt; + +use crate::RaftTypeConfig; +use crate::async_runtime::watch::RecvError; +use crate::async_runtime::watch::WatchReceiver; +use crate::replication::request::Data; +use crate::replication::request::Replicate; +use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::WatchReceiverOf; + +#[derive(Clone)] +pub(crate) struct EventWatcher +where C: RaftTypeConfig +{ + pub(crate) entries_rx: WatchReceiverOf>, + pub(crate) committed_rx: WatchReceiverOf>>, +} + +impl EventWatcher +where C: RaftTypeConfig +{ + pub(crate) async fn recv(&mut self) -> Result, RecvError> { + let entries = self.entries_rx.changed(); + let committed = self.committed_rx.changed(); + + futures::select! { + entries_res = entries.fuse() => { + entries_res?; + + let data = self.entries_rx.borrow_watched().clone(); + Ok(Replicate::Data {data}) + } + committed_res = committed.fuse() => { + committed_res?; + + let committed = self.committed_rx.borrow_watched().clone(); + Ok(Replicate::Committed {committed}) + } + } + } +} diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index b5afe4657..c2d06e26d 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -1,5 +1,6 @@ //! Replication stream. +pub(crate) mod event_watcher; pub(crate) mod inflight_append; pub(crate) mod inflight_append_queue; pub(crate) mod log_state; @@ -18,7 +19,6 @@ use std::sync::Arc; use std::time::Duration; use futures::StreamExt; -use futures::future::FutureExt; use replication_handle::ReplicationHandle; pub(crate) use replication_session_id::ReplicationSessionId; use replication_state::ReplicationState; @@ -31,7 +31,6 @@ use tracing_futures::Instrument; use crate::RaftNetworkFactory; use crate::RaftTypeConfig; -use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::Mutex; use crate::base::BoxStream; use crate::config::Config; @@ -47,6 +46,7 @@ use crate::network::v2::RaftNetworkV2; use crate::progress::inflight_id::InflightId; use crate::raft::AppendEntriesRequest; use crate::raft::StreamAppendError; +use crate::replication::event_watcher::EventWatcher; use crate::replication::inflight_append_queue::InflightAppendQueue; use crate::replication::log_state::LogState; use crate::replication::replication_context::ReplicationContext; @@ -56,8 +56,8 @@ use crate::type_config::TypeConfigExt; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::MpscSenderOf; -use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MutexOf; +use crate::type_config::alias::WatchSenderOf; use crate::type_config::async_runtime::mpsc::MpscSender; /// A task responsible for sending replication events to a target follower in the Raft cluster. @@ -78,10 +78,10 @@ where stream_state: Arc>>, /// A channel for receiving events from the RaftCore and snapshot transmitting task. - rx_event: MpscUnboundedReceiverOf>, + rx_event: EventWatcher, /// The next replication action to execute, set when partially completed. - next_action: Option>, + next_action: Option>, /// Identifies the current in-flight replication batch for progress tracking. inflight_id: Option, @@ -115,7 +115,9 @@ where matching: Option>, network: N::Network, log_reader: LS::LogReader, - tx_raft_core: MpscSenderOf>, + notification_tx: MpscSenderOf>, + event_watcher: EventWatcher, + entries_tx: WatchSenderOf>, span: tracing::Span, ) -> ReplicationHandle { tracing::debug!( @@ -126,8 +128,6 @@ where "spawn replication" ); - // Another component to ReplicationStream - let (tx_event, rx_event) = C::mpsc_unbounded(); let (cancel_tx, cancel_rx) = C::watch_channel(()); let id = session_id.leader_vote.node_id().clone(); @@ -139,7 +139,7 @@ where target, session_id: session_id.clone(), config, - tx_notify: tx_raft_core, + tx_notify: notification_tx, cancel_rx, }; @@ -154,7 +154,7 @@ where backoff: backoff.clone(), })), inflight_id: None, - rx_event, + rx_event: event_watcher, network: Some(network), replication_state: ReplicationState { stream_id: 0, @@ -175,9 +175,9 @@ where ReplicationHandle { session_id, join_handle, - tx_repl: tx_event, + entries_tx, snapshot_transmit_handle: None, - _cancel_tx: cancel_tx, + cancel_tx, } } @@ -223,7 +223,7 @@ where } if self.next_action.is_none() { - self.drain_events().await?; + self.next_action = Some(self.drain_events().await?); } let action = self.next_action.take().unwrap(); @@ -231,15 +231,13 @@ where self.inflight_id = action.inflight_id(); let mut log_id_range = match action { - Data::Committed => { - let m = self.replication_state.remote.last.clone(); + Replicate::Committed { committed } => { + self.replication_state.local.committed = committed.clone(); + let m = self.replication_state.remote.last.clone(); LogIdRange::new(m.clone(), m) } - Data::Logs { - inflight_id: _, - log_id_range, - } => log_id_range, + Replicate::Data { data } => data.log_id_range.clone(), }; { @@ -348,7 +346,7 @@ where // if partial success is returned, not all data is exhausted. keep sending log_id_range.prev = self.replication_state.remote.last.clone(); if log_id_range.len() > 0 { - self.next_action = Some(Data::new_logs(log_id_range, self.inflight_id.unwrap())); + self.next_action = Some(Replicate::logs(log_id_range, self.inflight_id.unwrap())); } } } @@ -459,77 +457,15 @@ where /// /// It blocks until at least one event is received. #[tracing::instrument(level = "trace", skip_all)] - pub async fn drain_events(&mut self) -> Result<(), ReplicationClosed> { + pub async fn drain_events(&mut self) -> Result, ReplicationClosed> { tracing::debug!("drain_events"); - // If there is next action to run, do not block waiting for events, - // instead, just try the best to drain all events. - if self.next_action.is_none() { - let event = - self.rx_event.recv().await.ok_or(ReplicationClosed::new("rx_repl is closed in drain_event()"))?; - self.process_event(event); - } - - // Returning from process_event(), next_action is never None. - - self.try_drain_events().await?; - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self))] - pub async fn try_drain_events(&mut self) -> Result<(), ReplicationClosed> { - tracing::debug!("{}", func_name!()); - - // Just drain all events in the channel. - // There should NOT be more than one `Replicate::Data` event in the channel. - // Looping it just collect all commit events and heartbeat events. - loop { - let maybe_res = self.rx_event.recv().now_or_never(); - - let Some(recv_res) = maybe_res else { - // No more event found in self.repl_rx - return Ok(()); - }; - - let event = recv_res.ok_or(ReplicationClosed::new("rx_repl is closed in try_drain_event"))?; - - self.process_event(event); - } - } + let event = self + .rx_event + .recv() + .await + .map_err(|_e| ReplicationClosed::new("EventWatcher is closed in drain_event()"))?; - #[tracing::instrument(level = "trace", skip_all)] - pub fn process_event(&mut self, event: Replicate) { - tracing::debug!(event = display(&event), "process_event"); - - match event { - Replicate::Committed { committed: c } => { - // RaftCore may send a committed equals to the initial value. - debug_assert!( - c >= self.replication_state.local.committed, - "expect new committed {} > self.committed {}", - c.display(), - self.replication_state.local.committed.display() - ); - - self.replication_state.local.committed = c; - - // If there is no action, fill in an heartbeat action to send committed index. - if self.next_action.is_none() { - self.next_action = Some(Data::new_committed()); - } - } - Replicate::Data { data: d } => { - // TODO: Currently there is at most 1 in flight data. But in future RaftCore may send next data - // actions without waiting for the previous to finish. - debug_assert!( - !self.next_action.as_ref().map(|d| d.has_payload()).unwrap_or(false), - "there cannot be two actions with payload in flight, curr: {}", - self.next_action.as_ref().map(|d| d.to_string()).display() - ); - - self.next_action = Some(d); - } - } + Ok(event) } } diff --git a/openraft/src/replication/replication_handle.rs b/openraft/src/replication/replication_handle.rs index ef6f44a7d..a049572e6 100644 --- a/openraft/src/replication/replication_handle.rs +++ b/openraft/src/replication/replication_handle.rs @@ -1,10 +1,9 @@ use crate::RaftTypeConfig; use crate::error::ReplicationClosed; use crate::replication::ReplicationSessionId; -use crate::replication::request::Replicate; +use crate::replication::request::Data; use crate::replication::snapshot_transmitter_handle::SnapshotTransmitterHandle; use crate::type_config::alias::JoinHandleOf; -use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::WatchSenderOf; /// The handle to a spawned replication stream. @@ -18,11 +17,11 @@ where C: RaftTypeConfig pub(crate) join_handle: JoinHandleOf>, /// The channel used for communicating with the replication task. - pub(crate) tx_repl: MpscUnboundedSenderOf>, + pub(crate) entries_tx: WatchSenderOf>, /// Handle to the snapshot transmitter task, if one is running. pub(crate) snapshot_transmit_handle: Option>, /// Sender for the cancellation signal; dropping this stops replication. - pub(crate) _cancel_tx: WatchSenderOf, + pub(crate) cancel_tx: WatchSenderOf, } diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index 7c20e9f39..67296b954 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -23,6 +23,13 @@ where C: RaftTypeConfig data: Data::new_logs(log_id_range, inflight_id), } } + + pub(crate) fn inflight_id(&self) -> Option { + match self { + Replicate::Committed { .. } => None, + Replicate::Data { data } => Some(data.inflight_id), + } + } } impl fmt::Display for Replicate { @@ -53,86 +60,42 @@ use crate::progress::inflight_id::InflightId; /// `Inflight` record on the leader, identified by an `InflightId`. The follower's response /// carries the same `InflightId` so the leader can match the response to the correct inflight /// state. -#[derive(PartialEq, Eq)] -pub(crate) enum Data +#[derive(PartialEq, Eq, Clone, Debug)] +pub(crate) struct Data where C: RaftTypeConfig { - Committed, - Logs { - inflight_id: InflightId, - log_id_range: LogIdRange, - }, + pub(crate) inflight_id: InflightId, + pub(crate) log_id_range: LogIdRange, } -impl fmt::Debug for Data +impl Default for Data where C: RaftTypeConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Data::Committed => { - write!(f, "Data::Committed") - } - Self::Logs { - inflight_id, - log_id_range, - } => f - .debug_struct("Data::Logs") - .field("log_id_range", log_id_range) - .field("inflight_id", inflight_id) - .finish(), + fn default() -> Self { + Data { + inflight_id: InflightId::new(0), + log_id_range: LogIdRange::new(None, None), } } } impl fmt::Display for Data { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Data::Committed => { - write!(f, "Committed") - } - Self::Logs { - inflight_id, - log_id_range, - } => { - write!( - f, - "Logs{{log_id_range: {}, inflight_id: {}}}", - log_id_range, inflight_id - ) - } - } + write!( + f, + "Data{{log_id_range: {}, inflight_id: {}}}", + self.log_id_range, self.inflight_id + ) } } impl Data where C: RaftTypeConfig { - pub(crate) fn new_committed() -> Self { - Self::Committed - } - pub(crate) fn new_logs(log_id_range: LogIdRange, inflight_id: InflightId) -> Self { - Self::Logs { + Self { log_id_range, inflight_id, } } - - /// Returns the inflight ID if this is a log replication request. - /// - /// Returns `None` for commit-only updates (heartbeats). - pub(crate) fn inflight_id(&self) -> Option { - match self { - Data::Committed => None, - Data::Logs { inflight_id, .. } => Some(*inflight_id), - } - } - - /// Return true if the data includes any payload, i.e., not a heartbeat. - pub(crate) fn has_payload(&self) -> bool { - match self { - Self::Committed => false, - Self::Logs { .. } => true, - } - } }