|
|
@@ -6,12 +6,11 @@ use parking_lot::{Condvar, Mutex};
|
|
|
use crate::daemon_env::ErrorKind;
|
|
|
use crate::heartbeats::HEARTBEAT_INTERVAL;
|
|
|
use crate::peer_progress::PeerProgress;
|
|
|
-use crate::term_marker::TermMarker;
|
|
|
+use crate::remote_context::RemoteContext;
|
|
|
use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
|
|
|
-use crate::verify_authority::DaemonBeatTicker;
|
|
|
use crate::{
|
|
|
check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
|
|
|
- Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
|
|
|
+ Peer, Raft, RaftState, ReplicableCommand, Term,
|
|
|
};
|
|
|
|
|
|
#[derive(Eq, PartialEq)]
|
|
|
@@ -132,9 +131,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
if !this.inner_state.lock().is_leader() {
|
|
|
continue;
|
|
|
}
|
|
|
- for (i, rpc_client) in this.peers.iter().enumerate() {
|
|
|
- if i != this.me.0 && event.should_schedule(Peer(i)) {
|
|
|
- let progress = &peer_progress[i];
|
|
|
+ for peer in this.peers.clone().into_iter() {
|
|
|
+ if peer != this.me && event.should_schedule(peer) {
|
|
|
+ let progress = &peer_progress[peer.0];
|
|
|
if let Event::NewTerm(_term, index) = event {
|
|
|
progress.reset_progress(index);
|
|
|
}
|
|
|
@@ -144,12 +143,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
task_number += 1;
|
|
|
this.thread_pool.spawn(Self::sync_log_entries(
|
|
|
this.inner_state.clone(),
|
|
|
- rpc_client.clone(),
|
|
|
this.sync_log_entries_comms.clone(),
|
|
|
progress.clone(),
|
|
|
this.apply_command_signal.clone(),
|
|
|
- this.term_marker(),
|
|
|
- this.beat_ticker(i),
|
|
|
TaskNumber(task_number),
|
|
|
));
|
|
|
}
|
|
|
@@ -201,15 +197,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
/// failure of the last case, we will never hit the other failure again,
|
|
|
/// since in the last case we always sync log entry at a committed index,
|
|
|
/// and a committed log entry can never diverge.
|
|
|
- #[allow(clippy::too_many_arguments)]
|
|
|
async fn sync_log_entries(
|
|
|
rf: Arc<Mutex<RaftState<Command>>>,
|
|
|
- rpc_client: impl RemoteRaft<Command>,
|
|
|
comms: SyncLogEntriesComms,
|
|
|
progress: PeerProgress,
|
|
|
apply_command_signal: Arc<Condvar>,
|
|
|
- term_marker: TermMarker<Command>,
|
|
|
- beat_ticker: DaemonBeatTicker,
|
|
|
task_number: TaskNumber,
|
|
|
) {
|
|
|
if !progress.take_task() {
|
|
|
@@ -224,8 +216,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
let term = args.term;
|
|
|
let prev_log_index = args.prev_log_index;
|
|
|
let match_index = args.prev_log_index + args.entries.len();
|
|
|
- let succeeded =
|
|
|
- Self::append_entries(&rpc_client, args, beat_ticker).await;
|
|
|
+ let succeeded = Self::append_entries(peer, args).await;
|
|
|
|
|
|
(term, prev_log_index, match_index, succeeded)
|
|
|
}
|
|
|
@@ -233,9 +224,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
let term = args.term;
|
|
|
let prev_log_index = args.last_included_index;
|
|
|
let match_index = args.last_included_index;
|
|
|
- let succeeded =
|
|
|
- Self::install_snapshot(&rpc_client, args, beat_ticker)
|
|
|
- .await;
|
|
|
+ let succeeded = Self::install_snapshot(peer, args).await;
|
|
|
|
|
|
(term, prev_log_index, match_index, succeeded)
|
|
|
}
|
|
|
@@ -367,7 +356,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
}
|
|
|
// Do nothing, not our term anymore.
|
|
|
Ok(SyncLogEntriesResult::TermElapsed(term)) => {
|
|
|
- term_marker.mark(term);
|
|
|
+ RemoteContext::<Command>::term_marker().mark(term);
|
|
|
}
|
|
|
Err(_) => {
|
|
|
tokio::time::sleep(HEARTBEAT_INTERVAL).await;
|
|
|
@@ -479,10 +468,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
|
|
|
const APPEND_ENTRIES_RETRY: usize = 1;
|
|
|
async fn append_entries(
|
|
|
- rpc_client: &dyn RemoteRaft<Command>,
|
|
|
+ peer: Peer,
|
|
|
args: AppendEntriesArgs<Command>,
|
|
|
- beat_ticker: DaemonBeatTicker,
|
|
|
) -> std::io::Result<SyncLogEntriesResult> {
|
|
|
+ let beat_ticker = RemoteContext::<Command>::beat_ticker(peer);
|
|
|
+ let rpc_client = RemoteContext::<Command>::rpc_client(peer);
|
|
|
+
|
|
|
let term = args.term;
|
|
|
let beat = beat_ticker.next_beat();
|
|
|
let reply = retry_rpc(
|
|
|
@@ -522,10 +513,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
|
|
|
const INSTALL_SNAPSHOT_RETRY: usize = 1;
|
|
|
async fn install_snapshot(
|
|
|
- rpc_client: &dyn RemoteRaft<Command>,
|
|
|
+ peer: Peer,
|
|
|
args: InstallSnapshotArgs,
|
|
|
- beat_ticker: DaemonBeatTicker,
|
|
|
) -> std::io::Result<SyncLogEntriesResult> {
|
|
|
+ let beat_ticker = RemoteContext::<Command>::beat_ticker(peer);
|
|
|
+ let rpc_client = RemoteContext::<Command>::rpc_client(peer);
|
|
|
+
|
|
|
let term = args.term;
|
|
|
let beat = beat_ticker.next_beat();
|
|
|
let reply = retry_rpc(
|