소스 검색

Create the verify authority daemon to support readonly requests.

Jing Yang 3 년 전
부모
커밋
01fab3f446
5개의 변경된 파일325개의 추가작업 그리고 0개의 파일을 삭제
  1. 90 0
      src/beat_ticker.rs
  2. 1 0
      src/daemon_env.rs
  3. 9 0
      src/election.rs
  4. 8 0
      src/lib.rs
  5. 217 0
      src/verify_authority.rs

+ 90 - 0
src/beat_ticker.rs

@@ -0,0 +1,90 @@
+#![allow(unused)]
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+/// A beat is one request sent to a peer with a success response.
+/// The `usize` within is the unique ID of the request.
+#[derive(Eq, Ord, PartialOrd, PartialEq)]
+pub(crate) struct Beat(usize);
+
+/// A `BeatTicker` issues unique request IDs and records successful runs.
+///
+/// Each peer should have its own `BeatTicker`. Requests are ordered by the wall
+/// time they call `BeatTicker::next_beat()`. Each successful request marks the
+/// recognition from the peer that the sender (this instance) is the leader.
+///
+/// The leader status is continuous for a certain term. Imagine the following
+/// scenario. We are elected leader at absolute time `X`, and send a message
+/// to a peer at time `Y` (`Y > X`). The peer confirms the leader status by
+/// replying to the message. We can then assume that the peer recognizes us as
+/// the leader in entire time interval `[X, Y]`.
+///
+/// For each term, the starting point of the interval (`X`) is fixed. Newer
+/// requests extends the interval further. Thus, we only need to record the last
+/// confirmation from a peer.
+///
+/// At any time `T` that has `T > X`, if there are more than `N/2` peers
+/// confirmed the leader status after `T`, we can be sure that at time `T` we
+/// were the leader.
+pub(crate) struct BeatTicker {
+    // Beat count might overflow after 25 days at 2000 QPS to a single peer, if
+    // we assume usize is 32-bit.
+    // This should not be a problem because:
+    // 1. usize is usually 64-bit.
+    // 2. 2000 QPS is an overestimate.
+    beat_count: AtomicUsize,
+    ticked: AtomicUsize,
+}
+
+impl BeatTicker {
+    /// Creates a `BeatTicker`.
+    /// The first unique request ID issued by the ticker will be 1. The initial
+    /// value of successful request will start at ID 0.
+    fn create() -> Self {
+        Self {
+            beat_count: AtomicUsize::new(1),
+            ticked: AtomicUsize::new(0),
+        }
+    }
+
+    /// Issues the next unique request ID.
+    pub fn next_beat(&self) -> Beat {
+        let count = self.beat_count.fetch_add(1, Ordering::AcqRel);
+        assert_ne!(count, usize::MAX, "BeatTicker count overflow");
+        Beat(count)
+    }
+
+    /// Returns the newest request ID.
+    pub fn current_beat(&self) -> Beat {
+        Beat(self.beat_count.load(Ordering::Acquire))
+    }
+
+    /// Marks a beat as successful.
+    pub fn tick(&self, count: Beat) {
+        self.ticked.fetch_max(count.0, Ordering::AcqRel);
+    }
+
+    /// Returns the last successful request ID.
+    pub fn ticked(&self) -> Beat {
+        Beat(self.ticked.load(Ordering::Acquire))
+    }
+}
+
+/// A smart pointer to share `BeatTicker` among threads and tasks.
+#[derive(Clone)]
+pub(crate) struct SharedBeatTicker(Arc<BeatTicker>);
+
+impl SharedBeatTicker {
+    pub fn create() -> Self {
+        Self(Arc::new(BeatTicker::create()))
+    }
+}
+
+impl Deref for SharedBeatTicker {
+    type Target = BeatTicker;
+
+    fn deref(&self) -> &Self::Target {
+        self.0.deref()
+    }
+}

+ 1 - 0
src/daemon_env.rs

@@ -41,6 +41,7 @@ pub(crate) enum Daemon {
     ElectionTimer,
     SyncLogEntries,
     ApplyCommand,
+    VerifyAuthority,
 }
 
 #[derive(Debug, Default)]

+ 9 - 0
src/election.rs

@@ -8,6 +8,7 @@ use rand::{thread_rng, Rng};
 use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{Peer, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term};
 
 #[derive(Default)]
@@ -277,6 +278,7 @@ where
             rx,
             self.election.clone(),
             self.new_log_entry.clone().unwrap(),
+            self.verify_authority_daemon.clone(),
         ));
         Some(tx)
     }
@@ -302,6 +304,7 @@ where
         None
     }
 
+    #[allow(clippy::too_many_arguments)]
     async fn count_vote_util_cancelled(
         me: Peer,
         term: Term,
@@ -310,6 +313,7 @@ where
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
         new_log_entry: SharedSender<Option<Peer>>,
+        verify_authority_daemon: VerifyAuthorityDaemon,
     ) {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;
@@ -363,6 +367,11 @@ where
             for item in rf.current_step.iter_mut() {
                 *item = 0;
             }
+            // Reset the verify authority daemon before sending heartbeats to
+            // followers. This is critical to the correctness of verifying
+            // authority.
+            verify_authority_daemon.reset_state(term);
+
             // Sync all logs now.
             let _ = new_log_entry.send(None);
         }

+ 8 - 0
src/lib.rs

@@ -19,8 +19,10 @@ pub(crate) use crate::raft_state::State;
 pub use crate::remote_raft::RemoteRaft;
 pub use crate::snapshot::Snapshot;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::verify_authority::VerifyAuthorityDaemon;
 
 mod apply_command;
+mod beat_ticker;
 mod daemon_env;
 mod election;
 mod heartbeats;
@@ -36,6 +38,7 @@ mod snapshot;
 mod sync_log_entries;
 mod term_marker;
 pub mod utils;
+mod verify_authority;
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -67,6 +70,7 @@ pub struct Raft<Command> {
     keep_running: Arc<AtomicBool>,
     election: Arc<ElectionState>,
     snapshot_daemon: SnapshotDaemon,
+    verify_authority_daemon: VerifyAuthorityDaemon,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 
@@ -206,6 +210,7 @@ where
             keep_running: Arc::new(Default::default()),
             election: Arc::new(election),
             snapshot_daemon: Default::default(),
+            verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
             thread_pool: Arc::new(thread_pool),
             daemon_env,
             stop_wait_group: WaitGroup::new(),
@@ -213,6 +218,8 @@ where
 
         this.keep_running.store(true, Ordering::SeqCst);
         // Running in a standalone thread.
+        this.run_verify_authority_daemon();
+        // Running in a standalone thread.
         this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         // Running in a standalone thread.
         this.run_log_entry_daemon();
@@ -269,6 +276,7 @@ where
         self.new_log_entry.take().map(|n| n.send(None));
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
+        self.verify_authority_daemon.kill();
         // We cannot easily combine stop_wait_group into DaemonEnv because of
         // shutdown dependencies. The thread pool is not managed by DaemonEnv,
         // but it cannot be shutdown until all daemons are. On the other hand

+ 217 - 0
src/verify_authority.rs

@@ -0,0 +1,217 @@
+use crate::beat_ticker::{Beat, SharedBeatTicker};
+use crate::daemon_env::Daemon;
+use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
+use crossbeam_utils::sync::{Parker, Unparker};
+use parking_lot::Mutex;
+use std::collections::VecDeque;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+/// The result returned to a verify authority request.
+/// This request is not directly exposed to end users. Instead it is used
+/// internally to implement no-commit read-only requests.
+pub(crate) enum VerifyAuthorityResult {
+    Success,
+    TermElapsed,
+    TimedOut,
+}
+
+/// Ticket issued to each client that requested a authority verification.
+#[allow(dead_code)]
+pub(crate) struct VerifyAuthorityTicket {
+    term: Term,
+    index: Index,
+    receiver: futures_channel::oneshot::Receiver<VerifyAuthorityResult>,
+}
+
+/// Token stored in the internal queue for authority verification. Each token
+/// represents one verification request.
+struct VerifyAuthorityToken {
+    beats_moment: Vec<Beat>,
+    rough_time: Instant,
+    sender: futures_channel::oneshot::Sender<VerifyAuthorityResult>,
+}
+
+#[derive(Clone, Copy, Default, Eq, Ord, PartialOrd, PartialEq)]
+struct QueueIndex(usize);
+
+/// The state of this daemon, should bee protected by a mutex.
+struct VerifyAuthorityState {
+    /// The current term. Might be behind the real term in the cluster.
+    term: Term,
+    /// Pending requests to verify authority.
+    queue: VecDeque<VerifyAuthorityToken>,
+    /// Number of requests that have been processed.
+    start: QueueIndex,
+    /// A vector of queue indexes. Each element in this vector indicates the
+    /// index of the first request that has not been confirmed by the
+    /// corresponding peer.
+    /// These indexes include all processed requests. They will never go down.
+    covered: Vec<QueueIndex>,
+}
+
+impl VerifyAuthorityState {
+    pub fn create(peer_count: usize) -> Self {
+        VerifyAuthorityState {
+            term: Term(0),
+            queue: Default::default(),
+            start: QueueIndex(0),
+            covered: vec![QueueIndex(0); peer_count],
+        }
+    }
+
+    pub fn reset(&mut self, term: Term) {
+        self.clear_tickets();
+
+        self.term = term;
+        self.start = QueueIndex(0);
+        for item in self.covered.iter_mut() {
+            *item = QueueIndex(0)
+        }
+    }
+
+    pub fn clear_tickets(&mut self) {
+        for token in self.queue.drain(..) {
+            let _ = token.sender.send(VerifyAuthorityResult::TermElapsed);
+        }
+    }
+}
+
+#[derive(Clone)]
+pub(crate) struct VerifyAuthorityDaemon {
+    state: Arc<Mutex<VerifyAuthorityState>>,
+    beat_tickers: Vec<SharedBeatTicker>,
+    unparker: Option<Unparker>,
+}
+
+impl VerifyAuthorityDaemon {
+    pub fn create(peer_count: usize) -> Self {
+        Self {
+            state: Arc::new(Mutex::new(VerifyAuthorityState::create(
+                peer_count,
+            ))),
+            beat_tickers: (0..peer_count)
+                .map(|_| SharedBeatTicker::create())
+                .collect(),
+            unparker: None,
+        }
+    }
+
+    pub fn reset_state(&self, term: Term) {
+        self.state.lock().reset(term);
+    }
+
+    /// Run one iteration of the verify authority daemon.
+    /// Fetches the newest successful RPC response from peers, and mark verify
+    /// authority requests as complete if they are covered by more than half of
+    /// the replicas.
+    pub fn run_verify_authority_iteration(&self) {
+        // Opportunistic check: do nothing if we don't have any requests.
+        if self.state.lock().queue.is_empty() {
+            return;
+        }
+
+        for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
+            // Fetches the newest successful RPC response from the current peer.
+            let ticked = beat_ticker.ticked();
+            let mut state = self.state.lock();
+            // Update progress with `ticked`. All requests that came before
+            // `ticked` now have one more votes of leader authority from the
+            // current peer.
+            let first_not_ticked_index = state.queue.partition_point(|token| {
+                token.beats_moment[peer_index] <= ticked
+            });
+            assert!(first_not_ticked_index >= state.covered[peer_index].0);
+            state.covered[peer_index].0 = first_not_ticked_index;
+
+            // Count the requests that has more than N / 2 votes. We always have
+            // the vote from ourselves, but the value is 0 in `covered` array.
+            let mut sorted_covered = state.covered.to_owned();
+            sorted_covered.sort_unstable();
+            let mid = sorted_covered.len() / 2 + 1;
+            let new_start = sorted_covered[mid];
+
+            // All requests before `new_start` is now verified.
+            let verified = new_start.0 - state.start.0;
+            for token in state.queue.drain(..verified) {
+                for (index, beat) in token.beats_moment.iter().enumerate() {
+                    assert!(self.beat_tickers[index].ticked() >= *beat);
+                }
+                let _ = token.sender.send(VerifyAuthorityResult::Success);
+            }
+            // Move the queue starting point.
+            state.start = new_start;
+        }
+    }
+
+    const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
+        Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 2);
+
+    /// Remove expired requests if we are no longer the leader.
+    /// If we have lost leadership, we are unlikely to receive confirmations
+    /// of past leadership state from peers. Requests are expired after two
+    /// heartbeat period have passed. We do not immediately cancel all incoming
+    /// requests, in hope that we could still answer them accurately without
+    /// breaking the consistency guarantee.
+    fn removed_expired_requests(&self, current_term: Term) {
+        let mut state = self.state.lock();
+        // Return if we are still the leader, or we become the leader again.
+        //
+        // Note that we do not hold the main raft state lock, thus the value of
+        // `current_term` might not be up-to-date. We only update `state.term`
+        // after an election. If in a term after `current_term`, we are elected
+        // leader again, `state.term` could be updated and thus greater than the
+        // (now stale) `current_term`. In that case, the queue should have been
+        // reset. There will be no expired request to remove.
+        if state.term >= current_term {
+            return;
+        }
+
+        let expiring_line =
+            Instant::now() - Self::VERIFY_AUTHORITY_REQUEST_EXPIRATION;
+        // Assuming bounded clock skew, otherwise we will lose efficiency.
+        let expired =
+            |head: &VerifyAuthorityToken| head.rough_time < expiring_line;
+        // Note rough_time might not be in increasing order, so we might still
+        // have requests that are expired in the queue after the sweep.
+        while state.queue.front().map_or(false, expired) {
+            state
+                .queue
+                .pop_front()
+                .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
+        }
+    }
+
+    pub fn kill(&self) {
+        if let Some(unparker) = self.unparker.as_ref() {
+            unparker.unpark();
+        }
+    }
+}
+
+impl<Command: 'static + Send> Raft<Command> {
+    const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
+
+    /// Create a thread and runs the verify authority daemon.
+    pub(crate) fn run_verify_authority_daemon(&mut self) {
+        let parker = Parker::new();
+        let unparker = parker.unparker().clone();
+        self.verify_authority_daemon.unparker.replace(unparker);
+
+        let keep_running = self.keep_running.clone();
+        let this_daemon = self.verify_authority_daemon.clone();
+        let rf = self.inner_state.clone();
+
+        let join_handle = std::thread::spawn(move || {
+            while keep_running.load(Ordering::Acquire) {
+                parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
+                this_daemon.run_verify_authority_iteration();
+                let current_term = rf.lock().current_term;
+                this_daemon.removed_expired_requests(current_term);
+            }
+        });
+        self.daemon_env
+            .watch_daemon(Daemon::VerifyAuthority, join_handle);
+    }
+}