Selaa lähdekoodia

Move next_index and current_step out of RaftState.

Jing Yang 3 vuotta sitten
vanhempi
commit
e026ab6803
5 muutettua tiedostoa jossa 110 lisäystä ja 56 poistoa
  1. 1 1
      src/election.rs
  2. 1 0
      src/lib.rs
  3. 44 0
      src/peer_progress.rs
  4. 1 1
      src/raft.rs
  5. 63 54
      src/sync_log_entries.rs

+ 1 - 1
src/election.rs

@@ -381,7 +381,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon.reset_state(term, sentinel_commit_index);
 
             // Sync all logs now.
-            new_log_entry.update_followers(sentinel_commit_index);
+            new_log_entry.reset_progress(term, sentinel_commit_index);
         }
     }
 }

+ 1 - 0
src/lib.rs

@@ -21,6 +21,7 @@ mod heartbeats;
 mod index_term;
 mod log_array;
 mod messages;
+mod peer_progress;
 mod persister;
 mod process_append_entries;
 mod process_install_snapshot;

+ 44 - 0
src/peer_progress.rs

@@ -0,0 +1,44 @@
+use crate::Index;
+
+#[derive(Default)]
+#[repr(align(64))]
+pub(crate) struct PeerProgress {
+    next_index: Index,
+    current_step: i64,
+}
+
+impl PeerProgress {
+    pub fn reset_progress(&mut self, next_index: Index) {
+        assert!(next_index > 0, "Cannot reset next index to zero");
+        self.next_index = next_index;
+        self.current_step = 0;
+    }
+
+    pub fn record_failure(&mut self, committed_index: Index) {
+        let step = &mut self.current_step;
+        if *step < 5 {
+            *step += 1;
+        }
+        let diff = 4 << *step;
+
+        let next_index = &mut self.next_index;
+        if diff >= *next_index {
+            *next_index = 1usize;
+        } else {
+            *next_index -= diff;
+        }
+
+        if *next_index < committed_index {
+            *next_index = committed_index;
+        }
+    }
+
+    pub fn record_success(&mut self, match_index: Index) {
+        self.next_index = match_index + 1;
+        self.current_step = 0;
+    }
+
+    pub fn next_index(&self) -> Index {
+        self.next_index
+    }
+}

+ 1 - 1
src/raft.rs

@@ -105,7 +105,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
             .collect();
         let (sync_log_entries_comms, sync_log_entries_daemon) =
-            crate::sync_log_entries::create();
+            crate::sync_log_entries::create(peer_size);
 
         let mut this = Raft {
             inner_state: Arc::new(Mutex::new(state)),

+ 63 - 54
src/sync_log_entries.rs

@@ -5,6 +5,7 @@ use parking_lot::{Condvar, Mutex};
 
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::heartbeats::HEARTBEAT_INTERVAL;
+use crate::peer_progress::PeerProgress;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
@@ -13,9 +14,9 @@ use crate::{
     Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
 };
 
-#[derive(Clone)]
+#[derive(Clone, Eq, PartialEq)]
 enum Event {
-    NewTerm(Term),
+    NewTerm(Term, Index),
     NewLogEntry(Index),
     Rerun(Peer),
     Shutdown,
@@ -24,7 +25,7 @@ enum Event {
 impl Event {
     fn should_schedule(&self, peer: Peer) -> bool {
         match self {
-            Event::NewTerm(_term) => true,
+            Event::NewTerm(..) => true,
             Event::NewLogEntry(_index) => true,
             Event::Rerun(p) => p == &peer,
             Event::Shutdown => false,
@@ -43,26 +44,38 @@ impl SyncLogEntriesComms {
         let _ = self.tx.send(Event::NewLogEntry(index));
     }
 
+    pub fn reset_progress(&self, term: Term, index: Index) {
+        let _ = self.tx.send(Event::NewTerm(term, index));
+    }
+
     pub fn kill(&self) {
         self.tx
             .send(Event::Shutdown)
             .expect("The sync log entries daemon should still be alive");
     }
 
-    fn rerun(&self, peer_index: usize) {
+    fn rerun(&self, peer: Peer) {
         // Ignore the error. The log syncing thread must have died.
-        let _ = self.tx.send(Event::Rerun(Peer(peer_index)));
+        let _ = self.tx.send(Event::Rerun(peer));
     }
 }
 
 pub(crate) struct SyncLogEntriesDaemon {
     rx: std::sync::mpsc::Receiver<Event>,
+    peer_progress: Vec<Arc<Mutex<PeerProgress>>>,
 }
 
-pub(crate) fn create() -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
+pub(crate) fn create(
+    peer_size: usize,
+) -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
     let (tx, rx) = std::sync::mpsc::channel();
     let tx = SharedSender::new(tx);
-    (SyncLogEntriesComms { tx }, SyncLogEntriesDaemon { rx })
+    let mut peer_progress = Vec::with_capacity(peer_size);
+    peer_progress.resize_with(peer_size, Default::default);
+    (
+        SyncLogEntriesComms { tx },
+        SyncLogEntriesDaemon { rx, peer_progress },
+    )
 }
 
 #[repr(align(64))]
@@ -108,7 +121,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// and backoff strategy.
     pub(crate) fn run_log_entry_daemon(
         &self,
-        SyncLogEntriesDaemon { rx }: SyncLogEntriesDaemon,
+        SyncLogEntriesDaemon { rx, peer_progress }: SyncLogEntriesDaemon,
     ) {
         // Clone everything that the thread needs.
         let this = self.clone();
@@ -131,6 +144,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
                 for (i, rpc_client) in this.peers.iter().enumerate() {
                     if i != this.me.0 && event.should_schedule(Peer(i)) {
+                        if let Event::NewTerm(_term, index) = event {
+                            peer_progress[i].lock().reset_progress(index);
+                        }
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
                         if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {
@@ -138,8 +154,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
                             this.thread_pool.spawn(Self::sync_log_entries(
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
-                                i,
+                                Peer(i),
                                 this.sync_log_entries_comms.clone(),
+                                peer_progress[i].clone(),
                                 openings[i].0.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
@@ -201,8 +218,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
     async fn sync_log_entries(
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
-        peer_index: usize,
+        peer: Peer,
         comms: SyncLogEntriesComms,
+        progress: Arc<Mutex<PeerProgress>>,
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
@@ -213,8 +231,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
             return;
         }
 
-        let operation =
-            Self::build_sync_log_entries(&rf, peer_index, task_number);
+        let operation = Self::build_sync_log_entries(
+            &rf,
+            peer,
+            progress.clone(),
+            task_number,
+        );
         let (term, prev_log_index, match_index, succeeded) = match operation {
             SyncLogEntriesOperation::AppendEntries(args) => {
                 let term = args.term;
@@ -238,7 +260,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
             SyncLogEntriesOperation::None => return,
         };
 
-        let peer = Peer(peer_index);
         match succeeded {
             Ok(SyncLogEntriesResult::Success) => {
                 let mut rf = rf.lock();
@@ -258,8 +279,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     &rf
                 );
 
-                rf.next_index[peer_index] = match_index + 1;
-                rf.current_step[peer_index] = 0;
+                progress.lock().record_success(match_index);
+                let peer_index = peer.0;
                 if match_index > rf.match_index[peer_index] {
                     rf.match_index[peer_index] = match_index;
                     let mut matched = rf.match_index.to_vec();
@@ -310,11 +331,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 if prev_log_index == match_index {
                     // If we did not make any progress this time, try again.
                     // This can only happen when installing snapshots.
-                    comms.rerun(peer_index);
+                    comms.rerun(peer);
                 }
             }
             Ok(SyncLogEntriesResult::Archived(committed)) => {
-                let mut rf = rf.lock();
+                let rf = rf.lock();
 
                 check_or_record!(
                     prev_log_index < committed.index,
@@ -323,26 +344,25 @@ impl<Command: ReplicableCommand> Raft<Command> {
                         committed.index
                     ),
                     format!(
-                        "Peer {} misbehaves: claimed log index {} is archived, \
+                        "{:?} misbehaves: claimed log index {} is archived, \
                         but commit index is at {:?}) which is before that",
-                        peer_index, prev_log_index, committed
+                        peer, prev_log_index, committed
                     ),
                     &rf
                 );
 
                 Self::check_committed(&rf, peer, committed.clone());
 
-                rf.current_step[peer_index] = 0;
                 // Next index moves towards the log end. This is the only place
                 // where that happens. committed.index should be between log
                 // start and end, guaranteed by check_committed() above.
-                rf.next_index[peer_index] = committed.index + 1;
+                progress.lock().record_success(committed.index + 1);
 
                 // Ignore the error. The log syncing thread must have died.
-                comms.rerun(peer_index);
+                comms.rerun(peer);
             }
             Ok(SyncLogEntriesResult::Diverged(committed)) => {
-                let mut rf = rf.lock();
+                let rf = rf.lock();
                 check_or_record!(
                     prev_log_index > committed.index,
                     ErrorKind::DivergedBeforeCommitted(
@@ -350,33 +370,18 @@ impl<Command: ReplicableCommand> Raft<Command> {
                         committed.index
                     ),
                     format!(
-                        "Peer {} claimed log index {} does not match, \
+                        "{:?} claimed log index {} does not match, \
                          but commit index is at {:?}) which is after that.",
-                        peer_index, prev_log_index, committed
+                        peer, prev_log_index, committed
                     ),
                     &rf
                 );
                 Self::check_committed(&rf, peer, committed.clone());
 
-                let step = &mut rf.current_step[peer_index];
-                if *step < 5 {
-                    *step += 1;
-                }
-                let diff = 4 << *step;
-
-                let next_index = &mut rf.next_index[peer_index];
-                if diff >= *next_index {
-                    *next_index = 1usize;
-                } else {
-                    *next_index -= diff;
-                }
-
-                if *next_index < committed.index {
-                    *next_index = committed.index;
-                }
+                progress.lock().record_failure(committed.index);
 
                 // Ignore the error. The log syncing thread must have died.
-                comms.rerun(peer_index);
+                comms.rerun(peer);
             }
             // Do nothing, not our term anymore.
             Ok(SyncLogEntriesResult::TermElapsed(term)) => {
@@ -385,7 +390,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             Err(_) => {
                 tokio::time::sleep(HEARTBEAT_INTERVAL).await;
                 // Ignore the error. The log syncing thread must have died.
-                comms.rerun(peer_index);
+                comms.rerun(peer);
             }
         };
     }
@@ -424,7 +429,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
     fn build_sync_log_entries(
         rf: &Mutex<RaftState<Command>>,
-        peer_index: usize,
+        peer: Peer,
+        progress: Arc<Mutex<PeerProgress>>,
         task_number: TaskNumber,
     ) -> SyncLogEntriesOperation<Command> {
         let rf = rf.lock();
@@ -435,24 +441,26 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // To send AppendEntries request, next_index must be strictly larger
         // than start(). Otherwise we won't be able to know the log term of the
         // entry right before next_index.
-        if rf.next_index[peer_index] > rf.log.start() {
-            if rf.next_index[peer_index] < rf.log.end() {
+        let progress = progress.lock();
+        let next_index = progress.next_index();
+        if next_index > rf.log.start() {
+            if next_index < rf.log.end() {
                 log::debug!(
                     "{:?} building append entries {:?} from {} to {:?}",
                     rf.leader_id,
                     task_number,
-                    rf.next_index[peer_index] - 1,
-                    Peer(peer_index)
+                    next_index - 1,
+                    peer
                 );
                 SyncLogEntriesOperation::AppendEntries(
-                    Self::build_append_entries(&rf, peer_index),
+                    Self::build_append_entries(&rf, &progress),
                 )
             } else {
                 log::debug!(
                     "{:?} nothing in append entries {:?} to {:?}",
                     rf.leader_id,
                     task_number,
-                    Peer(peer_index)
+                    peer
                 );
                 SyncLogEntriesOperation::None
             }
@@ -462,7 +470,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 rf.leader_id,
                 task_number,
                 rf.log.first_index_term().index,
-                Peer(peer_index)
+                peer,
             );
             SyncLogEntriesOperation::InstallSnapshot(
                 Self::build_install_snapshot(&rf),
@@ -472,17 +480,18 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
     fn build_append_entries(
         rf: &RaftState<Command>,
-        peer_index: usize,
+        progress: &PeerProgress,
     ) -> AppendEntriesArgs<Command> {
+        let next_index = progress.next_index();
         // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
-        let prev_log_index = rf.next_index[peer_index] - 1;
+        let prev_log_index = next_index - 1;
         let prev_log_term = rf.log.at(prev_log_index).term;
         AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_term,
-            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
+            entries: rf.log.after(next_index).to_vec(),
             leader_commit: rf.commit_index,
         }
     }