Browse Source

Clear the verify authority queue after a new commit is made.

Jing Yang 3 năm trước cách đây
mục cha
commit
07bf4917ed
1 tập tin đã thay đổi với 60 bổ sung12 xóa
  1. 60 12
      src/verify_authority.rs

+ 60 - 12
src/verify_authority.rs

@@ -1,6 +1,6 @@
 use crate::beat_ticker::{Beat, SharedBeatTicker};
 use crate::daemon_env::Daemon;
-use crate::{Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
+use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
 use crossbeam_utils::sync::{Parker, Unparker};
 use parking_lot::Mutex;
 use std::collections::VecDeque;
@@ -21,6 +21,7 @@ pub(crate) enum VerifyAuthorityResult {
 /// Token stored in the internal queue for authority verification. Each token
 /// represents one verification request.
 struct VerifyAuthorityToken {
+    commit_index: Index,
     beats_moment: Vec<Beat>,
     rough_time: Instant,
     sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
@@ -108,6 +109,7 @@ impl VerifyAuthorityDaemon {
     pub fn verify_authority_async(
         &self,
         current_term: Term,
+        commit_index: Index,
     ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
         // The inflight beats are sent at least for `current_term`. This is
         // guaranteed by the fact that we immediately increase beats for all
@@ -135,6 +137,7 @@ impl VerifyAuthorityDaemon {
 
         let (sender, receiver) = tokio::sync::oneshot::channel();
         let token = VerifyAuthorityToken {
+            commit_index,
             beats_moment,
             rough_time: Instant::now(),
             sender,
@@ -145,15 +148,55 @@ impl VerifyAuthorityDaemon {
     }
 
     /// Run one iteration of the verify authority daemon.
-    /// Fetches the newest successful RPC response from peers, and mark verify
-    /// authority requests as complete if they are covered by more than half of
-    /// the replicas.
-    pub fn run_verify_authority_iteration(&self) {
+    pub fn run_verify_authority_iteration(
+        &self,
+        current_term: Term,
+        commit_index: Index,
+    ) {
         // Opportunistic check: do nothing if we don't have any requests.
         if self.state.lock().queue.is_empty() {
             return;
         }
 
+        self.clear_committed_requests(current_term, commit_index);
+        self.clear_ticked_requests();
+        self.removed_expired_requests(current_term);
+    }
+
+    /// Clears all requests that have seen at least one commit.
+    /// This function handles the following scenario: a verify authority request
+    /// was received, when the `commit_index` was at C. Later as the leader we
+    /// moved the commit index to at least C+1. That implies that when the
+    /// request was first received, no other new commits after C could have been
+    /// added to the log, either by this replica or others. It then follows that
+    /// we can claim we had authority at that point.
+    fn clear_committed_requests(
+        &self,
+        current_term: Term,
+        commit_index: Index,
+    ) {
+        let mut state = self.state.lock();
+        if current_term != state.term {
+            return;
+        }
+
+        // Note the commit_index in the queue might not be in increasing order.
+        // We could still have requests that have a smaller commit_index after
+        // this sweep. That is an acceptable tradeoff we are taking.
+        while let Some(head) = state.queue.pop_front() {
+            if head.commit_index >= commit_index {
+                state.queue.push_front(head);
+                break;
+            }
+            let _ = head.sender.send(VerifyAuthorityResult::Success);
+            state.start.0 += 1;
+        }
+    }
+
+    /// Fetches the newest successful RPC response from peers, and mark verify
+    /// authority requests as complete if they are covered by more than half of
+    /// the replicas.
+    fn clear_ticked_requests(&self) {
         for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
             // Fetches the newest successful RPC response from the current peer.
             let ticked = beat_ticker.ticked();
@@ -222,6 +265,7 @@ impl VerifyAuthorityDaemon {
                 .queue
                 .pop_front()
                 .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
+            state.start.0 += 1;
         }
     }
 
@@ -248,9 +292,12 @@ impl<Command: 'static + Send> Raft<Command> {
         let join_handle = std::thread::spawn(move || {
             while keep_running.load(Ordering::Acquire) {
                 parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
-                this_daemon.run_verify_authority_iteration();
-                let current_term = rf.lock().current_term;
-                this_daemon.removed_expired_requests(current_term);
+                let (current_term, commit_index) = {
+                    let rf = rf.lock();
+                    (rf.current_term, rf.commit_index)
+                };
+                this_daemon
+                    .run_verify_authority_iteration(current_term, commit_index);
             }
         });
         self.daemon_env
@@ -263,16 +310,17 @@ impl<Command: 'static + Send> Raft<Command> {
     pub(crate) fn verify_authority_async(
         &self,
     ) -> Option<impl Future<Output = VerifyAuthorityResult>> {
-        let term = {
+        let (term, commit_index) = {
             let rf = self.inner_state.lock();
             if !rf.is_leader() {
                 return None;
             }
 
-            rf.current_term
+            (rf.current_term, rf.commit_index)
         };
-        let receiver =
-            self.verify_authority_daemon.verify_authority_async(term);
+        let receiver = self
+            .verify_authority_daemon
+            .verify_authority_async(term, commit_index);
         receiver.map(|receiver| async move {
             receiver
                 .await