Просмотр исходного кода

Move next_index and current_step out of RaftState.

Plus other refactors in sync_log_entries.rs.
Jing Yang 3 лет назад
Родитель
Сommit
f34f697280
6 измененных файлов с 209 добавлено и 93 удалено
  1. 6 14
      src/election.rs
  2. 1 0
      src/lib.rs
  3. 82 0
      src/peer_progress.rs
  4. 8 5
      src/raft.rs
  5. 0 4
      src/raft_state.rs
  6. 112 70
      src/sync_log_entries.rs

+ 6 - 14
src/election.rs

@@ -6,8 +6,9 @@ use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 use rand::{thread_rng, Rng};
 
 
 use crate::daemon_env::Daemon;
 use crate::daemon_env::Daemon;
+use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::term_marker::TermMarker;
 use crate::term_marker::TermMarker;
-use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
 use crate::{
     Peer, Persister, Raft, RaftState, RemoteRaft, ReplicableCommand,
     Peer, Persister, Raft, RaftState, RemoteRaft, ReplicableCommand,
@@ -270,7 +271,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             votes,
             votes,
             rx,
             rx,
             self.election.clone(),
             self.election.clone(),
-            self.new_log_entry.clone().unwrap(),
+            self.sync_log_entries_comms.clone(),
             self.verify_authority_daemon.clone(),
             self.verify_authority_daemon.clone(),
             self.persister.clone(),
             self.persister.clone(),
         ));
         ));
@@ -306,7 +307,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
         election: Arc<ElectionState>,
-        new_log_entry: SharedSender<Option<Peer>>,
+        new_log_entry: SyncLogEntriesComms,
         verify_authority_daemon: VerifyAuthorityDaemon,
         verify_authority_daemon: VerifyAuthorityDaemon,
         persister: Arc<dyn Persister>,
         persister: Arc<dyn Persister>,
     ) {
     ) {
@@ -352,16 +353,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
 
             rf.state = State::Leader;
             rf.state = State::Leader;
             rf.leader_id = me;
             rf.leader_id = me;
-            let log_len = rf.log.end();
-            for item in rf.next_index.iter_mut() {
-                *item = log_len;
-            }
-            for item in rf.match_index.iter_mut() {
-                *item = 0;
-            }
-            for item in rf.current_step.iter_mut() {
-                *item = 0;
-            }
+            rf.match_index.fill(0);
 
 
             let sentinel_commit_index;
             let sentinel_commit_index;
             if rf.commit_index != rf.log.last_index_term().index {
             if rf.commit_index != rf.log.last_index_term().index {
@@ -380,7 +372,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon.reset_state(term, sentinel_commit_index);
             verify_authority_daemon.reset_state(term, sentinel_commit_index);
 
 
             // Sync all logs now.
             // Sync all logs now.
-            let _ = new_log_entry.send(None);
+            new_log_entry.reset_progress(term, sentinel_commit_index);
         }
         }
     }
     }
 }
 }

+ 1 - 0
src/lib.rs

@@ -21,6 +21,7 @@ mod heartbeats;
 mod index_term;
 mod index_term;
 mod log_array;
 mod log_array;
 mod messages;
 mod messages;
+mod peer_progress;
 mod persister;
 mod persister;
 mod process_append_entries;
 mod process_append_entries;
 mod process_install_snapshot;
 mod process_install_snapshot;

+ 82 - 0
src/peer_progress.rs

@@ -0,0 +1,82 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+use parking_lot::Mutex;
+
+use crate::{Index, Peer};
+
+struct SharedIndexes {
+    next_index: Index,
+    current_step: i64,
+}
+
+struct SharedProgress {
+    opening: AtomicUsize,
+    indexes: Mutex<SharedIndexes>,
+}
+
+#[derive(Clone)]
+#[repr(align(64))]
+pub(crate) struct PeerProgress {
+    pub peer: Peer,
+    internal: Arc<SharedProgress>,
+}
+
+impl PeerProgress {
+    pub fn create(peer_index: usize) -> Self {
+        Self {
+            peer: Peer(peer_index),
+            internal: Arc::new(SharedProgress {
+                opening: AtomicUsize::new(0),
+                indexes: Mutex::new(SharedIndexes {
+                    next_index: 1,
+                    current_step: 0,
+                }),
+            }),
+        }
+    }
+
+    pub fn should_schedule(&self) -> bool {
+        self.internal.opening.fetch_add(1, Ordering::AcqRel) == 0
+    }
+
+    pub fn take_task(&self) -> bool {
+        self.internal.opening.swap(0, Ordering::AcqRel) != 0
+    }
+
+    pub fn reset_progress(&self, next_index: Index) {
+        let mut internal = self.internal.indexes.lock();
+        internal.next_index = next_index;
+        internal.current_step = 0;
+    }
+
+    pub fn record_failure(&self, committed_index: Index) {
+        let mut internal = self.internal.indexes.lock();
+        let step = &mut internal.current_step;
+        if *step < 5 {
+            *step += 1;
+        }
+        let diff = 4 << *step;
+
+        let next_index = &mut internal.next_index;
+        if diff >= *next_index {
+            *next_index = 1usize;
+        } else {
+            *next_index -= diff;
+        }
+
+        if *next_index < committed_index {
+            *next_index = committed_index;
+        }
+    }
+
+    pub fn record_success(&self, match_index: Index) {
+        let mut internal = self.internal.indexes.lock();
+        internal.next_index = match_index + 1;
+        internal.current_step = 0;
+    }
+
+    pub fn next_index(&self) -> Index {
+        self.internal.indexes.lock().next_index
+    }
+}

+ 8 - 5
src/raft.rs

@@ -11,6 +11,7 @@ use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
 use crate::persister::PersistedRaftState;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
 use crate::{
     utils, IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand,
     utils, IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand,
@@ -33,7 +34,7 @@ pub struct Raft<Command> {
 
 
     pub(crate) persister: Arc<dyn Persister>,
     pub(crate) persister: Arc<dyn Persister>,
 
 
-    pub(crate) new_log_entry: Option<utils::SharedSender<Option<Peer>>>,
+    pub(crate) sync_log_entries_comms: SyncLogEntriesComms,
     pub(crate) apply_command_signal: Arc<Condvar>,
     pub(crate) apply_command_signal: Arc<Condvar>,
     pub(crate) keep_running: Arc<AtomicBool>,
     pub(crate) keep_running: Arc<AtomicBool>,
     pub(crate) election: Arc<ElectionState>,
     pub(crate) election: Arc<ElectionState>,
@@ -103,13 +104,15 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .into_iter()
             .into_iter()
             .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
             .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
             .collect();
             .collect();
+        let (sync_log_entries_comms, sync_log_entries_daemon) =
+            crate::sync_log_entries::create(peer_size);
 
 
         let mut this = Raft {
         let mut this = Raft {
             inner_state: Arc::new(Mutex::new(state)),
             inner_state: Arc::new(Mutex::new(state)),
             peers,
             peers,
             me: Peer(me),
             me: Peer(me),
             persister,
             persister,
-            new_log_entry: None,
+            sync_log_entries_comms,
             apply_command_signal: Arc::new(Condvar::new()),
             apply_command_signal: Arc::new(Condvar::new()),
             keep_running: Arc::new(AtomicBool::new(true)),
             keep_running: Arc::new(AtomicBool::new(true)),
             election: Arc::new(election),
             election: Arc::new(election),
@@ -125,7 +128,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // Running in a standalone thread.
         // Running in a standalone thread.
         this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         // Running in a standalone thread.
         // Running in a standalone thread.
-        this.run_log_entry_daemon();
+        this.run_log_entry_daemon(sync_log_entries_daemon);
         // Running in a standalone thread.
         // Running in a standalone thread.
         this.run_apply_command_daemon(apply_command);
         this.run_apply_command_daemon(apply_command);
         // One off function that schedules many little tasks, running on the
         // One off function that schedules many little tasks, running on the
@@ -159,7 +162,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         self.persister.save_state(rf.persisted_state().into());
         self.persister.save_state(rf.persisted_state().into());
 
 
         // Several attempts have been made to remove the unwrap below.
         // Several attempts have been made to remove the unwrap below.
-        let _ = self.new_log_entry.as_ref().unwrap().send(None);
+        let _ = self.sync_log_entries_comms.update_followers(index);
 
 
         log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
         log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
         Some(IndexTerm::pack(index, term))
         Some(IndexTerm::pack(index, term))
@@ -173,7 +176,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn kill(mut self) {
     pub fn kill(mut self) {
         self.keep_running.store(false, Ordering::Release);
         self.keep_running.store(false, Ordering::Release);
         self.election.stop_election_timer();
         self.election.stop_election_timer();
-        self.new_log_entry.take().map(|n| n.send(None));
+        self.sync_log_entries_comms.kill();
         self.apply_command_signal.notify_all();
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
         self.snapshot_daemon.kill();
         self.verify_authority_daemon.kill();
         self.verify_authority_daemon.kill();

+ 0 - 4
src/raft_state.rs

@@ -18,9 +18,7 @@ pub(crate) struct RaftState<Command> {
     pub commit_index: Index,
     pub commit_index: Index,
     pub last_applied: Index,
     pub last_applied: Index,
 
 
-    pub next_index: Vec<Index>,
     pub match_index: Vec<Index>,
     pub match_index: Vec<Index>,
-    pub current_step: Vec<i64>,
 
 
     pub state: State,
     pub state: State,
 
 
@@ -35,9 +33,7 @@ impl<Command> RaftState<Command> {
             log: LogArray::create(),
             log: LogArray::create(),
             commit_index: 0,
             commit_index: 0,
             last_applied: 0,
             last_applied: 0,
-            next_index: vec![1; peer_size],
             match_index: vec![0; peer_size],
             match_index: vec![0; peer_size],
-            current_step: vec![0; peer_size],
             state: State::Follower,
             state: State::Follower,
             leader_id: me,
             leader_id: me,
         }
         }

+ 112 - 70
src/sync_log_entries.rs

@@ -1,20 +1,81 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::Ordering;
 use std::sync::Arc;
 use std::sync::Arc;
 
 
 use parking_lot::{Condvar, Mutex};
 use parking_lot::{Condvar, Mutex};
 
 
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
+use crate::peer_progress::PeerProgress;
 use crate::term_marker::TermMarker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
 use crate::verify_authority::DaemonBeatTicker;
 use crate::{
 use crate::{
-    check_or_record, AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer,
-    Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
+    check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
+    Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
 };
 };
 
 
-#[repr(align(64))]
-struct Opening(Arc<AtomicUsize>);
+#[derive(Clone, Eq, PartialEq)]
+enum Event {
+    NewTerm(Term, Index),
+    NewLogEntry(Index),
+    Rerun(Peer),
+    Shutdown,
+}
+
+impl Event {
+    fn should_schedule(&self, peer: Peer) -> bool {
+        match self {
+            Event::NewTerm(..) => true,
+            Event::NewLogEntry(_index) => true,
+            Event::Rerun(p) => p == &peer,
+            Event::Shutdown => false,
+        }
+    }
+}
+
+#[derive(Clone)]
+pub(crate) struct SyncLogEntriesComms {
+    tx: crate::utils::SharedSender<Event>,
+}
+
+impl SyncLogEntriesComms {
+    pub fn update_followers(&self, index: Index) {
+        // Ignore the error. The log syncing thread must have died.
+        let _ = self.tx.send(Event::NewLogEntry(index));
+    }
+
+    pub fn reset_progress(&self, term: Term, index: Index) {
+        let _ = self.tx.send(Event::NewTerm(term, index));
+    }
+
+    pub fn kill(&self) {
+        self.tx
+            .send(Event::Shutdown)
+            .expect("The sync log entries daemon should still be alive");
+    }
+
+    fn rerun(&self, peer: Peer) {
+        // Ignore the error. The log syncing thread must have died.
+        let _ = self.tx.send(Event::Rerun(peer));
+    }
+}
+
+pub(crate) struct SyncLogEntriesDaemon {
+    rx: std::sync::mpsc::Receiver<Event>,
+    peer_progress: Vec<PeerProgress>,
+}
+
+pub(crate) fn create(
+    peer_size: usize,
+) -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
+    let (tx, rx) = std::sync::mpsc::channel();
+    let tx = SharedSender::new(tx);
+    let peer_progress = (0..peer_size).map(PeerProgress::create).collect();
+    (
+        SyncLogEntriesComms { tx },
+        SyncLogEntriesDaemon { rx, peer_progress },
+    )
+}
 
 
 enum SyncLogEntriesOperation<Command> {
 enum SyncLogEntriesOperation<Command> {
     AppendEntries(AppendEntriesArgs<Command>),
     AppendEntries(AppendEntriesArgs<Command>),
@@ -54,24 +115,17 @@ impl<Command: ReplicableCommand> Raft<Command> {
     ///
     ///
     /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
     /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
     /// and backoff strategy.
     /// and backoff strategy.
-    pub(crate) fn run_log_entry_daemon(&mut self) {
-        let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
-        let tx = SharedSender::new(tx);
-        self.new_log_entry.replace(tx);
-
+    pub(crate) fn run_log_entry_daemon(
+        &self,
+        SyncLogEntriesDaemon { rx, peer_progress }: SyncLogEntriesDaemon,
+    ) {
         // Clone everything that the thread needs.
         // Clone everything that the thread needs.
         let this = self.clone();
         let this = self.clone();
         let sync_log_entry_daemon = move || {
         let sync_log_entry_daemon = move || {
             log::info!("{:?} sync log entries daemon running ...", this.me);
             log::info!("{:?} sync log entries daemon running ...", this.me);
 
 
-            let mut openings = vec![];
-            openings.resize_with(this.peers.len(), || {
-                Opening(Arc::new(AtomicUsize::new(0)))
-            });
-            let openings = openings; // Not mutable beyond this point.
-
             let mut task_number = 0;
             let mut task_number = 0;
-            while let Ok(peer) = rx.recv() {
+            while let Ok(event) = rx.recv() {
                 if !this.keep_running.load(Ordering::Relaxed) {
                 if !this.keep_running.load(Ordering::Relaxed) {
                     break;
                     break;
                 }
                 }
@@ -79,18 +133,20 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     continue;
                     continue;
                 }
                 }
                 for (i, rpc_client) in this.peers.iter().enumerate() {
                 for (i, rpc_client) in this.peers.iter().enumerate() {
-                    if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
-                    {
+                    if i != this.me.0 && event.should_schedule(Peer(i)) {
+                        let progress = &peer_progress[i];
+                        if let Event::NewTerm(_term, index) = event {
+                            progress.reset_progress(index);
+                        }
                         // Only schedule a new task if the last task has cleared
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
                         // the queue of RPC requests.
-                        if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {
+                        if progress.should_schedule() {
                             task_number += 1;
                             task_number += 1;
                             this.thread_pool.spawn(Self::sync_log_entries(
                             this.thread_pool.spawn(Self::sync_log_entries(
                                 this.inner_state.clone(),
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
                                 rpc_client.clone(),
-                                i,
-                                this.new_log_entry.clone().unwrap(),
-                                openings[i].0.clone(),
+                                this.sync_log_entries_comms.clone(),
+                                progress.clone(),
                                 this.apply_command_signal.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
                                 this.term_marker(),
                                 this.beat_ticker(i),
                                 this.beat_ticker(i),
@@ -151,20 +207,20 @@ impl<Command: ReplicableCommand> Raft<Command> {
     async fn sync_log_entries(
     async fn sync_log_entries(
         rf: Arc<Mutex<RaftState<Command>>>,
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
         rpc_client: impl RemoteRaft<Command>,
-        peer_index: usize,
-        rerun: SharedSender<Option<Peer>>,
-        opening: Arc<AtomicUsize>,
+        comms: SyncLogEntriesComms,
+        progress: PeerProgress,
         apply_command_signal: Arc<Condvar>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
         term_marker: TermMarker<Command>,
         beat_ticker: DaemonBeatTicker,
         beat_ticker: DaemonBeatTicker,
         task_number: TaskNumber,
         task_number: TaskNumber,
     ) {
     ) {
-        if opening.swap(0, Ordering::AcqRel) == 0 {
+        if !progress.take_task() {
             return;
             return;
         }
         }
 
 
+        let peer = progress.peer;
         let operation =
         let operation =
-            Self::build_sync_log_entries(&rf, peer_index, task_number);
+            Self::build_sync_log_entries(&rf, &progress, task_number);
         let (term, prev_log_index, match_index, succeeded) = match operation {
         let (term, prev_log_index, match_index, succeeded) = match operation {
             SyncLogEntriesOperation::AppendEntries(args) => {
             SyncLogEntriesOperation::AppendEntries(args) => {
                 let term = args.term;
                 let term = args.term;
@@ -188,7 +244,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
             SyncLogEntriesOperation::None => return,
             SyncLogEntriesOperation::None => return,
         };
         };
 
 
-        let peer = Peer(peer_index);
         match succeeded {
         match succeeded {
             Ok(SyncLogEntriesResult::Success) => {
             Ok(SyncLogEntriesResult::Success) => {
                 let mut rf = rf.lock();
                 let mut rf = rf.lock();
@@ -208,8 +263,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     &rf
                     &rf
                 );
                 );
 
 
-                rf.next_index[peer_index] = match_index + 1;
-                rf.current_step[peer_index] = 0;
+                progress.record_success(match_index);
+                let peer_index = peer.0;
                 if match_index > rf.match_index[peer_index] {
                 if match_index > rf.match_index[peer_index] {
                     rf.match_index[peer_index] = match_index;
                     rf.match_index[peer_index] = match_index;
                     let mut matched = rf.match_index.to_vec();
                     let mut matched = rf.match_index.to_vec();
@@ -260,11 +315,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 if prev_log_index == match_index {
                 if prev_log_index == match_index {
                     // If we did not make any progress this time, try again.
                     // If we did not make any progress this time, try again.
                     // This can only happen when installing snapshots.
                     // This can only happen when installing snapshots.
-                    let _ = rerun.send(Some(Peer(peer_index)));
+                    comms.rerun(peer);
                 }
                 }
             }
             }
             Ok(SyncLogEntriesResult::Archived(committed)) => {
             Ok(SyncLogEntriesResult::Archived(committed)) => {
-                let mut rf = rf.lock();
+                let rf = rf.lock();
 
 
                 check_or_record!(
                 check_or_record!(
                     prev_log_index < committed.index,
                     prev_log_index < committed.index,
@@ -273,26 +328,25 @@ impl<Command: ReplicableCommand> Raft<Command> {
                         committed.index
                         committed.index
                     ),
                     ),
                     format!(
                     format!(
-                        "Peer {} misbehaves: claimed log index {} is archived, \
+                        "{:?} misbehaves: claimed log index {} is archived, \
                         but commit index is at {:?}) which is before that",
                         but commit index is at {:?}) which is before that",
-                        peer_index, prev_log_index, committed
+                        peer, prev_log_index, committed
                     ),
                     ),
                     &rf
                     &rf
                 );
                 );
 
 
                 Self::check_committed(&rf, peer, committed.clone());
                 Self::check_committed(&rf, peer, committed.clone());
 
 
-                rf.current_step[peer_index] = 0;
                 // Next index moves towards the log end. This is the only place
                 // Next index moves towards the log end. This is the only place
                 // where that happens. committed.index should be between log
                 // where that happens. committed.index should be between log
                 // start and end, guaranteed by check_committed() above.
                 // start and end, guaranteed by check_committed() above.
-                rf.next_index[peer_index] = committed.index + 1;
+                progress.record_success(committed.index + 1);
 
 
                 // Ignore the error. The log syncing thread must have died.
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer);
             }
             }
             Ok(SyncLogEntriesResult::Diverged(committed)) => {
             Ok(SyncLogEntriesResult::Diverged(committed)) => {
-                let mut rf = rf.lock();
+                let rf = rf.lock();
                 check_or_record!(
                 check_or_record!(
                     prev_log_index > committed.index,
                     prev_log_index > committed.index,
                     ErrorKind::DivergedBeforeCommitted(
                     ErrorKind::DivergedBeforeCommitted(
@@ -300,33 +354,18 @@ impl<Command: ReplicableCommand> Raft<Command> {
                         committed.index
                         committed.index
                     ),
                     ),
                     format!(
                     format!(
-                        "Peer {} claimed log index {} does not match, \
+                        "{:?} claimed log index {} does not match, \
                          but commit index is at {:?}) which is after that.",
                          but commit index is at {:?}) which is after that.",
-                        peer_index, prev_log_index, committed
+                        peer, prev_log_index, committed
                     ),
                     ),
                     &rf
                     &rf
                 );
                 );
                 Self::check_committed(&rf, peer, committed.clone());
                 Self::check_committed(&rf, peer, committed.clone());
 
 
-                let step = &mut rf.current_step[peer_index];
-                if *step < 5 {
-                    *step += 1;
-                }
-                let diff = 4 << *step;
-
-                let next_index = &mut rf.next_index[peer_index];
-                if diff >= *next_index {
-                    *next_index = 1usize;
-                } else {
-                    *next_index -= diff;
-                }
-
-                if *next_index < committed.index {
-                    *next_index = committed.index;
-                }
+                progress.record_failure(committed.index);
 
 
                 // Ignore the error. The log syncing thread must have died.
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer);
             }
             }
             // Do nothing, not our term anymore.
             // Do nothing, not our term anymore.
             Ok(SyncLogEntriesResult::TermElapsed(term)) => {
             Ok(SyncLogEntriesResult::TermElapsed(term)) => {
@@ -335,7 +374,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             Err(_) => {
             Err(_) => {
                 tokio::time::sleep(HEARTBEAT_INTERVAL).await;
                 tokio::time::sleep(HEARTBEAT_INTERVAL).await;
                 // Ignore the error. The log syncing thread must have died.
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer);
             }
             }
         };
         };
     }
     }
@@ -374,7 +413,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
 
     fn build_sync_log_entries(
     fn build_sync_log_entries(
         rf: &Mutex<RaftState<Command>>,
         rf: &Mutex<RaftState<Command>>,
-        peer_index: usize,
+        progress: &PeerProgress,
         task_number: TaskNumber,
         task_number: TaskNumber,
     ) -> SyncLogEntriesOperation<Command> {
     ) -> SyncLogEntriesOperation<Command> {
         let rf = rf.lock();
         let rf = rf.lock();
@@ -382,27 +421,30 @@ impl<Command: ReplicableCommand> Raft<Command> {
             return SyncLogEntriesOperation::None;
             return SyncLogEntriesOperation::None;
         }
         }
 
 
+        let peer = progress.peer;
+
         // To send AppendEntries request, next_index must be strictly larger
         // To send AppendEntries request, next_index must be strictly larger
         // than start(). Otherwise we won't be able to know the log term of the
         // than start(). Otherwise we won't be able to know the log term of the
         // entry right before next_index.
         // entry right before next_index.
-        if rf.next_index[peer_index] > rf.log.start() {
-            if rf.next_index[peer_index] < rf.log.end() {
+        let next_index = progress.next_index();
+        if next_index > rf.log.start() {
+            if next_index < rf.log.end() {
                 log::debug!(
                 log::debug!(
                     "{:?} building append entries {:?} from {} to {:?}",
                     "{:?} building append entries {:?} from {} to {:?}",
                     rf.leader_id,
                     rf.leader_id,
                     task_number,
                     task_number,
-                    rf.next_index[peer_index] - 1,
-                    Peer(peer_index)
+                    next_index - 1,
+                    peer
                 );
                 );
                 SyncLogEntriesOperation::AppendEntries(
                 SyncLogEntriesOperation::AppendEntries(
-                    Self::build_append_entries(&rf, peer_index),
+                    Self::build_append_entries(&rf, next_index),
                 )
                 )
             } else {
             } else {
                 log::debug!(
                 log::debug!(
                     "{:?} nothing in append entries {:?} to {:?}",
                     "{:?} nothing in append entries {:?} to {:?}",
                     rf.leader_id,
                     rf.leader_id,
                     task_number,
                     task_number,
-                    Peer(peer_index)
+                    peer
                 );
                 );
                 SyncLogEntriesOperation::None
                 SyncLogEntriesOperation::None
             }
             }
@@ -412,7 +454,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 rf.leader_id,
                 rf.leader_id,
                 task_number,
                 task_number,
                 rf.log.first_index_term().index,
                 rf.log.first_index_term().index,
-                Peer(peer_index)
+                peer,
             );
             );
             SyncLogEntriesOperation::InstallSnapshot(
             SyncLogEntriesOperation::InstallSnapshot(
                 Self::build_install_snapshot(&rf),
                 Self::build_install_snapshot(&rf),
@@ -422,17 +464,17 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
 
     fn build_append_entries(
     fn build_append_entries(
         rf: &RaftState<Command>,
         rf: &RaftState<Command>,
-        peer_index: usize,
+        next_index: Index,
     ) -> AppendEntriesArgs<Command> {
     ) -> AppendEntriesArgs<Command> {
         // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
         // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
-        let prev_log_index = rf.next_index[peer_index] - 1;
+        let prev_log_index = next_index - 1;
         let prev_log_term = rf.log.at(prev_log_index).term;
         let prev_log_term = rf.log.at(prev_log_index).term;
         AppendEntriesArgs {
         AppendEntriesArgs {
             term: rf.current_term,
             term: rf.current_term,
             leader_id: rf.leader_id,
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_index,
             prev_log_term,
             prev_log_term,
-            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
+            entries: rf.log.after(next_index).to_vec(),
             leader_commit: rf.commit_index,
             leader_commit: rf.commit_index,
         }
         }
     }
     }