Explorar o código

Implement the publibc verify authority API on Raft.

Jing Yang %!s(int64=3) %!d(string=hai) anos
pai
achega
8210fcfba7
Modificáronse 3 ficheiros con 78 adicións e 11 borrados
  1. 1 1
      Cargo.toml
  2. 2 0
      src/election.rs
  3. 75 10
      src/verify_authority.rs

+ 1 - 1
Cargo.toml

@@ -24,7 +24,7 @@ parking_lot = "0.12"
 rand = "0.8"
 serde = "1.0"
 serde_derive = "1.0"
-tokio = { version = "1.7", features = ["net", "rt-multi-thread", "time", "parking_lot"] }
+tokio = { version = "1.7", features = ["net", "rt-multi-thread", "sync", "time", "parking_lot"] }
 test_utils = { path = "test_utils", optional = true }
 
 [features]

+ 2 - 0
src/election.rs

@@ -370,6 +370,8 @@ where
             // Reset the verify authority daemon before sending heartbeats to
             // followers. This is critical to the correctness of verifying
             // authority.
+            // No verity authority request can go through before the reset is
+            // done, since we are holding the raft lock.
             verify_authority_daemon.reset_state(term);
 
             // Sync all logs now.

+ 75 - 10
src/verify_authority.rs

@@ -1,9 +1,10 @@
 use crate::beat_ticker::{Beat, SharedBeatTicker};
 use crate::daemon_env::Daemon;
-use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
+use crate::{Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
 use crossbeam_utils::sync::{Parker, Unparker};
 use parking_lot::Mutex;
 use std::collections::VecDeque;
+use std::future::Future;
 use std::sync::atomic::Ordering;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
@@ -17,20 +18,12 @@ pub(crate) enum VerifyAuthorityResult {
     TimedOut,
 }
 
-/// Ticket issued to each client that requested a authority verification.
-#[allow(dead_code)]
-pub(crate) struct VerifyAuthorityTicket {
-    term: Term,
-    index: Index,
-    receiver: futures_channel::oneshot::Receiver<VerifyAuthorityResult>,
-}
-
 /// Token stored in the internal queue for authority verification. Each token
 /// represents one verification request.
 struct VerifyAuthorityToken {
     beats_moment: Vec<Beat>,
     rough_time: Instant,
-    sender: futures_channel::oneshot::Sender<VerifyAuthorityResult>,
+    sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
 }
 
 #[derive(Clone, Copy, Default, Eq, Ord, PartialOrd, PartialEq)]
@@ -100,6 +93,55 @@ impl VerifyAuthorityDaemon {
 
     pub fn reset_state(&self, term: Term) {
         self.state.lock().reset(term);
+        // Increase all beats by one to make sure upcoming verify authority
+        // requests wait for beats in the current term. This in fact creates
+        // phantom beats that will never be marked as completed by themselves.
+        // They will be automatically `ticked()` when newer (real) beats are
+        // created, sent and `ticked()`.
+        for beat_ticker in self.beat_tickers.iter() {
+            beat_ticker.next_beat();
+        }
+    }
+
+    /// Enqueues a verify authority request. Returns a receiver of the
+    /// verification result. Returns None if the term has passed.
+    pub fn verify_authority_async(
+        &self,
+        current_term: Term,
+    ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
+        // The inflight beats are sent at least for `current_term`. This is
+        // guaranteed by the fact that we immediately increase beats for all
+        // peers after being elected. It further guarantees that the newest
+        // beats we get here are at least as new as the phantom beats created by
+        // `Self::reset_state()`.
+        let beats_moment = self
+            .beat_tickers
+            .iter()
+            .map(|beat_ticker| beat_ticker.current_beat())
+            .collect();
+
+        // The inflight beats could also be for any term after `current_term`.
+        // We must check if the term stored in the daemon is the same as
+        // `current_term`.
+        // `state.term` could be smaller than `current_term`, if a new term is
+        // started by someone else and we lost leadership.
+        // `state.term` could be greater than `current_term`, if we lost
+        // leadership but are elected leader again in a following term.
+        // In both cases, we cannot confirm the leadership at `current_term`.
+        let mut state = self.state.lock();
+        if state.term != current_term {
+            return None;
+        }
+
+        let (sender, receiver) = tokio::sync::oneshot::channel();
+        let token = VerifyAuthorityToken {
+            beats_moment,
+            rough_time: Instant::now(),
+            sender,
+        };
+        state.queue.push_back(token);
+
+        Some(receiver)
     }
 
     /// Run one iteration of the verify authority daemon.
@@ -214,4 +256,27 @@ impl<Command: 'static + Send> Raft<Command> {
         self.daemon_env
             .watch_daemon(Daemon::VerifyAuthority, join_handle);
     }
+
+    /// Create a verify authority request. Returns None if we are not the
+    /// leader.
+    #[allow(dead_code)]
+    pub(crate) fn verify_authority_async(
+        &self,
+    ) -> Option<impl Future<Output = VerifyAuthorityResult>> {
+        let term = {
+            let rf = self.inner_state.lock();
+            if !rf.is_leader() {
+                return None;
+            }
+
+            rf.current_term
+        };
+        let receiver =
+            self.verify_authority_daemon.verify_authority_async(term);
+        receiver.map(|receiver| async move {
+            receiver
+                .await
+                .expect("Verify authority daemon never drops senders")
+        })
+    }
 }