Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::Membership;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::async_runtime::MpscReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::async_runtime::TryRecvError;
use crate::async_runtime::watch::WatchSender;
Expand Down Expand Up @@ -92,9 +91,10 @@ use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationSessionId;
use crate::replication::event_watcher::EventWatcher;
use crate::replication::replication_context::ReplicationContext;
use crate::replication::replication_handle::ReplicationHandle;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::replication::snapshot_transmitter::SnapshotTransmitter;
use crate::runtime::RaftRuntime;
use crate::storage::IOFlushed;
Expand All @@ -106,6 +106,7 @@ use crate::type_config::alias::MpscReceiverOf;
use crate::type_config::alias::MpscSenderOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::alias::WatchSenderOf;
use crate::type_config::alias::WriteResponderOf;
use crate::type_config::async_runtime::mpsc::MpscSender;
Expand Down Expand Up @@ -197,6 +198,10 @@ where
/// This is used by IOFlushed callbacks to report IO completion in a synchronous manner.
pub(crate) tx_io_completed: WatchSenderOf<C, Result<IOId<C>, StorageError<C>>>,

/// For broadcast committed log id to replication task.
pub(crate) committed_tx: WatchSenderOf<C, Option<LogIdOf<C>>>,
pub(crate) _committed_rx: WatchReceiverOf<C, Option<LogIdOf<C>>>,

pub(crate) tx_metrics: WatchSenderOf<C, RaftMetrics<C>>,
pub(crate) tx_data_metrics: WatchSenderOf<C, RaftDataMetrics<C>>,
pub(crate) tx_server_metrics: WatchSenderOf<C, RaftServerMetrics<C>>,
Expand Down Expand Up @@ -867,6 +872,8 @@ where

let session_id = ReplicationSessionId::new(leader.committed_vote.clone(), membership_log_id.clone());

let (event_watcher, entries_tx) = self.new_event_watcher();

ReplicationCore::<C, NF, LS>::spawn(
target.clone(),
session_id,
Expand All @@ -876,10 +883,22 @@ where
network,
self.log_store.get_log_reader().await,
self.tx_notification.clone(),
event_watcher,
entries_tx,
tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)),
)
}

fn new_event_watcher(&self) -> (EventWatcher<C>, WatchSenderOf<C, Data<C>>) {
let (entries_tx, entries_rx) = C::watch_channel(Data::default());
let ew = EventWatcher {
entries_rx,
committed_rx: self.committed_tx.subscribe(),
};

(ew, entries_tx)
}

/// Remove all replication.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn remove_all_replication(&mut self) {
Expand All @@ -898,7 +917,8 @@ where
let handle = s.join_handle;

// Drop sender to notify the task to shutdown
drop(s.tx_repl);
drop(s.entries_tx);
drop(s.cancel_tx);

tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
Expand Down Expand Up @@ -1934,11 +1954,7 @@ where
self.spawn_parallel_vote_requests(&vote_req).await;
}
Command::ReplicateCommitted { committed } => {
for node in self.replications.values() {
let _ = node.tx_repl.send(Replicate::Committed {
committed: committed.clone(),
});
}
self.committed_tx.send_if_greater(committed);
}
Command::BroadcastHeartbeat { session_id } => {
self.broadcast_heartbeat(session_id);
Expand All @@ -1956,7 +1972,7 @@ where
}
Command::Replicate { req, target } => {
let node = self.replications.get(&target).expect("replication to target node exists");
let _ = node.tx_repl.send(req);
let _ = node.entries_tx.send(req);
}
Command::ReplicateSnapshot { target, inflight_id } => {
let node = self.replications.get(&target).expect("replication to target node exists");
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::raft::message::TransferLeaderRequest;
use crate::raft_state::IOId;
use crate::raft_state::IOState;
use crate::replication::ReplicationSessionId;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::VoteOf;
Expand Down Expand Up @@ -94,7 +94,7 @@ where C: RaftTypeConfig
},

/// Replicate log entries to a target.
Replicate { target: C::NodeId, req: Replicate<C> },
Replicate { target: C::NodeId, req: Data<C> },

/// Replicate snapshot to a target.
ReplicateSnapshot { target: C::NodeId, inflight_id: InflightId },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::testing::blank_ent;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
Expand Down Expand Up @@ -151,11 +151,11 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)),
req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1)),
},
Command::Replicate {
target: 3,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(2)),
req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(2)),
},
],
eng.output.take_commands()
Expand Down Expand Up @@ -280,7 +280,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1))
req: Data::new_logs(LogIdRange::new(None, Some(log_id(3, 1, 6))), InflightId::new(1))
},
],
eng.output.take_commands()
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
Expand Down Expand Up @@ -348,7 +348,7 @@ where C: RaftTypeConfig
log_id_range,
inflight_id,
} => {
let req = Replicate::logs(log_id_range.clone(), *inflight_id);
let req = Data::new_logs(log_id_range.clone(), *inflight_id);
output.push_command(Command::Replicate {
target: target.clone(),
req,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::log_id_range::LogIdRange;
use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::EntryOf;
use crate::utime::Leased;
Expand Down Expand Up @@ -73,7 +73,7 @@ fn test_become_leader() -> anyhow::Result<()> {
},
Command::Replicate {
target: 0,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), InflightId::new(1))
req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 1, 0))), InflightId::new(1))
}
]);

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
Expand Down Expand Up @@ -219,7 +219,7 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> {
},
Command::Replicate {
target: 2,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), InflightId::new(1))
req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 1, 1))), InflightId::new(1))
},
],
eng.output.take_commands()
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::progress::Inflight;
use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::request::Data;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
Expand Down Expand Up @@ -89,7 +89,7 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
},
Command::Replicate {
target: 3,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), InflightId::new(1)),
req: Data::new_logs(LogIdRange::new(None, Some(log_id(2, 2, 4))), InflightId::new(1)),
}
],
eng.output.take_commands()
Expand Down Expand Up @@ -137,7 +137,7 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
},
Command::Replicate {
target: 3,
req: Replicate::logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), InflightId::new(1))
req: Data::new_logs(LogIdRange::new(None, Some(log_id(1, 2, 6))), InflightId::new(1))
}
],
eng.output.take_commands()
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ where C: RaftTypeConfig
sm_span,
);

let (committed_tx, committed_rx) = C::watch_channel(None);

let core: RaftCore<C, N, LS> = RaftCore {
id: id.clone(),
config: config.clone(),
Expand All @@ -475,6 +477,8 @@ where C: RaftTypeConfig

tx_io_completed,

committed_tx,
_committed_rx: committed_rx,
tx_metrics,
tx_data_metrics,
tx_server_metrics,
Expand Down
41 changes: 41 additions & 0 deletions openraft/src/replication/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use futures::FutureExt;

use crate::RaftTypeConfig;
use crate::async_runtime::watch::RecvError;
use crate::async_runtime::watch::WatchReceiver;
use crate::replication::request::Data;
use crate::replication::request::Replicate;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::WatchReceiverOf;

#[derive(Clone)]
pub(crate) struct EventWatcher<C>
where C: RaftTypeConfig
{
pub(crate) entries_rx: WatchReceiverOf<C, Data<C>>,
pub(crate) committed_rx: WatchReceiverOf<C, Option<LogIdOf<C>>>,
}

impl<C> EventWatcher<C>
where C: RaftTypeConfig
{
pub(crate) async fn recv(&mut self) -> Result<Replicate<C>, RecvError> {
let entries = self.entries_rx.changed();
let committed = self.committed_rx.changed();

futures::select! {
entries_res = entries.fuse() => {
entries_res?;

let data = self.entries_rx.borrow_watched().clone();
Ok(Replicate::Data {data})
}
committed_res = committed.fuse() => {
committed_res?;

let committed = self.committed_rx.borrow_watched().clone();
Ok(Replicate::Committed {committed})
}
}
}
}
Loading