Browse Source

Merge Opening into PeerProgress.

Jing Yang 3 years ago
parent
commit
8c80a8b26c
2 changed files with 34 additions and 26 deletions
  1. 28 10
      src/peer_progress.rs
  2. 6 16
      src/sync_log_entries.rs

+ 28 - 10
src/peer_progress.rs

@@ -1,39 +1,57 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
 use parking_lot::Mutex;
 
 use crate::{Index, Peer};
 
-struct PeerProgressInternal {
+struct SharedIndexes {
     next_index: Index,
     current_step: i64,
 }
 
+struct SharedProgress {
+    opening: AtomicUsize,
+    indexes: Mutex<SharedIndexes>,
+}
+
 #[derive(Clone)]
+#[repr(align(64))]
 pub(crate) struct PeerProgress {
     pub peer: Peer,
-    internal: Arc<Mutex<PeerProgressInternal>>,
+    internal: Arc<SharedProgress>,
 }
 
 impl PeerProgress {
     pub fn create(peer_index: usize) -> Self {
         Self {
             peer: Peer(peer_index),
-            internal: Arc::new(Mutex::new(PeerProgressInternal {
-                next_index: 1,
-                current_step: 0,
-            })),
+            internal: Arc::new(SharedProgress {
+                opening: AtomicUsize::new(0),
+                indexes: Mutex::new(SharedIndexes {
+                    next_index: 1,
+                    current_step: 0,
+                }),
+            }),
         }
     }
 
+    pub fn should_schedule(&self) -> bool {
+        self.internal.opening.fetch_add(1, Ordering::AcqRel) == 0
+    }
+
+    pub fn take_task(&self) -> bool {
+        self.internal.opening.swap(0, Ordering::AcqRel) != 0
+    }
+
     pub fn reset_progress(&self, next_index: Index) {
-        let mut internal = self.internal.lock();
+        let mut internal = self.internal.indexes.lock();
         internal.next_index = next_index;
         internal.current_step = 0;
     }
 
     pub fn record_failure(&self, committed_index: Index) {
-        let mut internal = self.internal.lock();
+        let mut internal = self.internal.indexes.lock();
         let step = &mut internal.current_step;
         if *step < 5 {
             *step += 1;
@@ -53,12 +71,12 @@ impl PeerProgress {
     }
 
     pub fn record_success(&self, match_index: Index) {
-        let mut internal = self.internal.lock();
+        let mut internal = self.internal.indexes.lock();
         internal.next_index = match_index + 1;
         internal.current_step = 0;
     }
 
     pub fn next_index(&self) -> Index {
-        self.internal.lock().next_index
+        self.internal.indexes.lock().next_index
     }
 }

+ 6 - 16
src/sync_log_entries.rs

@@ -1,4 +1,4 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::Ordering;
 use std::sync::Arc;
 
 use parking_lot::{Condvar, Mutex};
@@ -77,9 +77,6 @@ pub(crate) fn create(
     )
 }
 
-#[repr(align(64))]
-struct Opening(Arc<AtomicUsize>);
-
 enum SyncLogEntriesOperation<Command> {
     AppendEntries(AppendEntriesArgs<Command>),
     InstallSnapshot(InstallSnapshotArgs),
@@ -127,12 +124,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let sync_log_entry_daemon = move || {
             log::info!("{:?} sync log entries daemon running ...", this.me);
 
-            let mut openings = vec![];
-            openings.resize_with(this.peers.len(), || {
-                Opening(Arc::new(AtomicUsize::new(0)))
-            });
-            let openings = openings; // Not mutable beyond this point.
-
             let mut task_number = 0;
             while let Ok(event) = rx.recv() {
                 if !this.keep_running.load(Ordering::Relaxed) {
@@ -143,19 +134,19 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
                 for (i, rpc_client) in this.peers.iter().enumerate() {
                     if i != this.me.0 && event.should_schedule(Peer(i)) {
+                        let progress = &peer_progress[i];
                         if let Event::NewTerm(_term, index) = event {
-                            peer_progress[i].reset_progress(index);
+                            progress.reset_progress(index);
                         }
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
-                        if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {
+                        if progress.should_schedule() {
                             task_number += 1;
                             this.thread_pool.spawn(Self::sync_log_entries(
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
                                 this.sync_log_entries_comms.clone(),
-                                peer_progress[i].clone(),
-                                openings[i].0.clone(),
+                                progress.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
                                 this.beat_ticker(i),
@@ -218,13 +209,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
         rpc_client: impl RemoteRaft<Command>,
         comms: SyncLogEntriesComms,
         progress: PeerProgress,
-        opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
         beat_ticker: DaemonBeatTicker,
         task_number: TaskNumber,
     ) {
-        if opening.swap(0, Ordering::AcqRel) == 0 {
+        if !progress.take_task() {
             return;
         }