ソースを参照

Merge Peer into PeerProgress.

Jing Yang 3 年 前
コミット
9b1995e432
2 ファイル変更50 行追加37 行削除
  1. 35 15
      src/peer_progress.rs
  2. 15 22
      src/sync_log_entries.rs

+ 35 - 15
src/peer_progress.rs

@@ -1,27 +1,46 @@
-use crate::Index;
+use std::sync::Arc;
 
-#[derive(Default)]
-#[repr(align(64))]
-pub(crate) struct PeerProgress {
+use parking_lot::Mutex;
+
+use crate::{Index, Peer};
+
+struct PeerProgressInternal {
     next_index: Index,
     current_step: i64,
 }
 
+#[derive(Clone)]
+pub(crate) struct PeerProgress {
+    pub peer: Peer,
+    internal: Arc<Mutex<PeerProgressInternal>>,
+}
+
 impl PeerProgress {
-    pub fn reset_progress(&mut self, next_index: Index) {
-        assert!(next_index > 0, "Cannot reset next index to zero");
-        self.next_index = next_index;
-        self.current_step = 0;
+    pub fn create(peer_index: usize) -> Self {
+        Self {
+            peer: Peer(peer_index),
+            internal: Arc::new(Mutex::new(PeerProgressInternal {
+                next_index: 1,
+                current_step: 0,
+            })),
+        }
+    }
+
+    pub fn reset_progress(&self, next_index: Index) {
+        let mut internal = self.internal.lock();
+        internal.next_index = next_index;
+        internal.current_step = 0;
     }
 
-    pub fn record_failure(&mut self, committed_index: Index) {
-        let step = &mut self.current_step;
+    pub fn record_failure(&self, committed_index: Index) {
+        let mut internal = self.internal.lock();
+        let step = &mut internal.current_step;
         if *step < 5 {
             *step += 1;
         }
         let diff = 4 << *step;
 
-        let next_index = &mut self.next_index;
+        let next_index = &mut internal.next_index;
         if diff >= *next_index {
             *next_index = 1usize;
         } else {
@@ -33,12 +52,13 @@ impl PeerProgress {
         }
     }
 
-    pub fn record_success(&mut self, match_index: Index) {
-        self.next_index = match_index + 1;
-        self.current_step = 0;
+    pub fn record_success(&self, match_index: Index) {
+        let mut internal = self.internal.lock();
+        internal.next_index = match_index + 1;
+        internal.current_step = 0;
     }
 
     pub fn next_index(&self) -> Index {
-        self.next_index
+        self.internal.lock().next_index
     }
 }

+ 15 - 22
src/sync_log_entries.rs

@@ -62,7 +62,7 @@ impl SyncLogEntriesComms {
 
 pub(crate) struct SyncLogEntriesDaemon {
     rx: std::sync::mpsc::Receiver<Event>,
-    peer_progress: Vec<Arc<Mutex<PeerProgress>>>,
+    peer_progress: Vec<PeerProgress>,
 }
 
 pub(crate) fn create(
@@ -70,8 +70,7 @@ pub(crate) fn create(
 ) -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
     let (tx, rx) = std::sync::mpsc::channel();
     let tx = SharedSender::new(tx);
-    let mut peer_progress = Vec::with_capacity(peer_size);
-    peer_progress.resize_with(peer_size, Default::default);
+    let peer_progress = (0..peer_size).map(PeerProgress::create).collect();
     (
         SyncLogEntriesComms { tx },
         SyncLogEntriesDaemon { rx, peer_progress },
@@ -145,7 +144,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 for (i, rpc_client) in this.peers.iter().enumerate() {
                     if i != this.me.0 && event.should_schedule(Peer(i)) {
                         if let Event::NewTerm(_term, index) = event {
-                            peer_progress[i].lock().reset_progress(index);
+                            peer_progress[i].reset_progress(index);
                         }
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
@@ -154,7 +153,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
                             this.thread_pool.spawn(Self::sync_log_entries(
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
-                                Peer(i),
                                 this.sync_log_entries_comms.clone(),
                                 peer_progress[i].clone(),
                                 openings[i].0.clone(),
@@ -218,9 +216,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
     async fn sync_log_entries(
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
-        peer: Peer,
         comms: SyncLogEntriesComms,
-        progress: Arc<Mutex<PeerProgress>>,
+        progress: PeerProgress,
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
@@ -231,12 +228,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
             return;
         }
 
-        let operation = Self::build_sync_log_entries(
-            &rf,
-            peer,
-            progress.clone(),
-            task_number,
-        );
+        let peer = progress.peer;
+        let operation =
+            Self::build_sync_log_entries(&rf, &progress, task_number);
         let (term, prev_log_index, match_index, succeeded) = match operation {
             SyncLogEntriesOperation::AppendEntries(args) => {
                 let term = args.term;
@@ -279,7 +273,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     &rf
                 );
 
-                progress.lock().record_success(match_index);
+                progress.record_success(match_index);
                 let peer_index = peer.0;
                 if match_index > rf.match_index[peer_index] {
                     rf.match_index[peer_index] = match_index;
@@ -356,7 +350,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 // Next index moves towards the log end. This is the only place
                 // where that happens. committed.index should be between log
                 // start and end, guaranteed by check_committed() above.
-                progress.lock().record_success(committed.index + 1);
+                progress.record_success(committed.index + 1);
 
                 // Ignore the error. The log syncing thread must have died.
                 comms.rerun(peer);
@@ -378,7 +372,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 );
                 Self::check_committed(&rf, peer, committed.clone());
 
-                progress.lock().record_failure(committed.index);
+                progress.record_failure(committed.index);
 
                 // Ignore the error. The log syncing thread must have died.
                 comms.rerun(peer);
@@ -429,8 +423,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
     fn build_sync_log_entries(
         rf: &Mutex<RaftState<Command>>,
-        peer: Peer,
-        progress: Arc<Mutex<PeerProgress>>,
+        progress: &PeerProgress,
         task_number: TaskNumber,
     ) -> SyncLogEntriesOperation<Command> {
         let rf = rf.lock();
@@ -438,10 +431,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
             return SyncLogEntriesOperation::None;
         }
 
+        let peer = progress.peer;
+
         // To send AppendEntries request, next_index must be strictly larger
         // than start(). Otherwise we won't be able to know the log term of the
         // entry right before next_index.
-        let progress = progress.lock();
         let next_index = progress.next_index();
         if next_index > rf.log.start() {
             if next_index < rf.log.end() {
@@ -453,7 +447,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     peer
                 );
                 SyncLogEntriesOperation::AppendEntries(
-                    Self::build_append_entries(&rf, &progress),
+                    Self::build_append_entries(&rf, next_index),
                 )
             } else {
                 log::debug!(
@@ -480,9 +474,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
     fn build_append_entries(
         rf: &RaftState<Command>,
-        progress: &PeerProgress,
+        next_index: Index,
     ) -> AppendEntriesArgs<Command> {
-        let next_index = progress.next_index();
         // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
         let prev_log_index = next_index - 1;
         let prev_log_term = rf.log.at(prev_log_index).term;