Procházet zdrojové kódy

Merge branch 'verify_authority'

Jing Yang před 3 roky
rodič
revize
c5ea702fdb
10 změnil soubory, kde provedl 712 přidání a 25 odebrání
  1. 1 1
      Cargo.toml
  2. 77 18
      kvraft/src/server.rs
  3. 89 0
      src/beat_ticker.rs
  4. 1 0
      src/daemon_env.rs
  5. 25 2
      src/election.rs
  6. 36 2
      src/heartbeats.rs
  7. 12 0
      src/lib.rs
  8. 4 0
      src/raft_state.rs
  9. 14 2
      src/sync_log_entries.rs
  10. 453 0
      src/verify_authority.rs

+ 1 - 1
Cargo.toml

@@ -24,7 +24,7 @@ parking_lot = "0.12"
 rand = "0.8"
 serde = "1.0"
 serde_derive = "1.0"
-tokio = { version = "1.7", features = ["net", "rt-multi-thread", "time", "parking_lot"] }
+tokio = { version = "1.7", features = ["net", "rt-multi-thread", "sync", "time", "parking_lot"] }
 test_utils = { path = "test_utils", optional = true }
 
 [features]

+ 77 - 18
kvraft/src/server.rs

@@ -9,7 +9,10 @@ use futures::FutureExt;
 use parking_lot::Mutex;
 use serde_derive::{Deserialize, Serialize};
 
-use ruaft::{ApplyCommandMessage, Persister, Raft, RemoteRaft, Term};
+use ruaft::{
+    ApplyCommandMessage, Index, Persister, Raft, RemoteRaft, Term,
+    VerifyAuthorityResult,
+};
 use test_utils::log_with;
 use test_utils::thread_local_logger::LocalLogger;
 
@@ -36,6 +39,7 @@ pub struct UniqueKVOp {
 
 #[derive(Default, Serialize, Deserialize)]
 struct KVServerState {
+    raft_index: Index,
     kv: HashMap<String, String>,
     debug_kv: HashMap<String, String>,
     applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
@@ -50,6 +54,11 @@ struct KVServerState {
             >,
         ),
     >,
+    #[serde(skip)]
+    index_subscribers: HashMap<
+        Index,
+        Vec<(String, futures::channel::oneshot::Sender<Option<String>>)>,
+    >,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -198,6 +207,20 @@ impl KVServer {
         };
     }
 
+    fn process_read_requests(&self, index: Index) {
+        let mut state = self.state.lock();
+        assert!(index > state.raft_index);
+        for index in state.raft_index..=index {
+            if let Some(read_requests) = state.index_subscribers.remove(&index)
+            {
+                for (key, sender) in read_requests {
+                    let _ = sender.send(state.kv.get(&key).cloned());
+                }
+            }
+        }
+        state.raft_index = index;
+    }
+
     fn restore_state(&self, mut new_state: KVServerState) {
         let mut state = self.state.lock();
         // Cleanup all existing queries.
@@ -232,6 +255,7 @@ impl KVServer {
                                 command.me,
                                 command.op,
                             );
+                            this.process_read_requests(index);
                             if let Some(snapshot) = snapshot_holder
                                 .take_snapshot(&this.state.lock(), index)
                             {
@@ -247,6 +271,42 @@ impl KVServer {
         });
     }
 
+    async fn block_for_read(
+        &self,
+        key: String,
+    ) -> Result<Option<String>, KVError> {
+        let result_fut = match self.rf.verify_authority_async() {
+            Some(result_fut) => result_fut,
+            None => return Err(KVError::NotLeader),
+        };
+        let index =
+            match tokio::time::timeout(Self::DEFAULT_TIMEOUT, result_fut).await
+            {
+                Ok(VerifyAuthorityResult::Success(index)) => index,
+                Ok(VerifyAuthorityResult::TermElapsed) => {
+                    return Err(KVError::NotLeader)
+                }
+                Ok(VerifyAuthorityResult::TimedOut) => {
+                    return Err(KVError::TimedOut)
+                }
+                Err(_e) => return Err(KVError::TimedOut),
+            };
+        let receiver = {
+            let state = self.state.lock();
+            if state.raft_index >= index {
+                return Ok(state.kv.get(&key).cloned());
+            }
+            let (sender, receiver) = futures::channel::oneshot::channel();
+            // The mutex guard is moved into this scope and dropped here.
+            let mut state = state;
+            let queue = state.index_subscribers.entry(index).or_default();
+            queue.push((key, sender));
+            receiver
+        };
+
+        receiver.await.map_err(|_e| KVError::TimedOut)
+    }
+
     const UNSEEN_TERM: usize = 0;
     const ATTEMPTING_TERM: usize = usize::MAX;
     async fn block_for_commit(
@@ -370,31 +430,30 @@ impl KVServer {
 
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
 
-    pub async fn get(&self, args: GetArgs) -> GetReply {
-        let map_dup = match args.op {
-            GetEnum::AllowDuplicate => |r| Ok(r),
-            GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
-        };
+    pub async fn commit_sentinel(&self, args: GetArgs) -> GetReply {
+        assert_eq!(args.op, GetEnum::NoDuplicate);
         let result_fut = self.block_for_commit(
             args.unique_id,
             KVOp::Get(args.key),
             Self::DEFAULT_TIMEOUT,
         );
         let result = match result_fut.await {
-            Ok(result) => Ok(result),
-            Err(CommitError::Duplicate(result)) => map_dup(result),
-            Err(CommitError::NotMe(result)) => map_dup(result),
+            Ok(CommitResult::Get(result)) => Ok(result),
+            Ok(CommitResult::Put) => Err(KVError::Conflict),
+            Ok(CommitResult::Append) => Err(KVError::Conflict),
+            Err(CommitError::Duplicate(_)) => Err(KVError::Conflict),
+            Err(CommitError::NotMe(_)) => Err(KVError::Conflict),
             Err(e) => Err(e.into()),
         };
-        let result = match result {
-            Ok(result) => result,
-            Err(e) => return GetReply { result: Err(e) },
-        };
-        let result = match result {
-            CommitResult::Get(result) => Ok(result),
-            CommitResult::Put => Err(KVError::Conflict),
-            CommitResult::Append => Err(KVError::Conflict),
-        };
+        GetReply { result }
+    }
+
+    pub async fn get(&self, args: GetArgs) -> GetReply {
+        if args.op == GetEnum::NoDuplicate {
+            return self.commit_sentinel(args).await;
+        }
+        assert_eq!(args.op, GetEnum::AllowDuplicate);
+        let result = self.block_for_read(args.key).await;
         GetReply { result }
     }
 

+ 89 - 0
src/beat_ticker.rs

@@ -0,0 +1,89 @@
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+/// A beat is one request sent to a peer with a success response.
+/// The `usize` within is the unique ID of the request.
+#[derive(Debug, Eq, Ord, PartialOrd, PartialEq)]
+pub(crate) struct Beat(usize);
+
+/// A `BeatTicker` issues unique request IDs and records successful runs.
+///
+/// Each peer should have its own `BeatTicker`. Requests are ordered by the wall
+/// time they call `BeatTicker::next_beat()`. Each successful request marks the
+/// recognition from the peer that the sender (this instance) is the leader.
+///
+/// The leader status is continuous for a certain term. Imagine the following
+/// scenario. We are elected leader at absolute time `X`, and send a message
+/// to a peer at time `Y` (`Y > X`). The peer confirms the leader status by
+/// replying to the message. We can then assume that the peer recognizes us as
+/// the leader in entire time interval `[X, Y]`.
+///
+/// For each term, the starting point of the interval (`X`) is fixed. Newer
+/// requests extends the interval further. Thus, we only need to record the last
+/// confirmation from a peer.
+///
+/// At any time `T` that has `T > X`, if there are more than `N/2` peers
+/// confirmed the leader status after `T`, we can be sure that at time `T` we
+/// were the leader.
+pub(crate) struct BeatTicker {
+    // Beat count might overflow after 25 days at 2000 QPS to a single peer, if
+    // we assume usize is 32-bit.
+    // This should not be a problem because:
+    // 1. usize is usually 64-bit.
+    // 2. 2000 QPS is an overestimate.
+    beat_count: AtomicUsize,
+    ticked: AtomicUsize,
+}
+
+impl BeatTicker {
+    /// Creates a `BeatTicker`.
+    /// The first unique request ID issued by the ticker will be 1. The initial
+    /// value of successful request will start at ID 0.
+    fn create() -> Self {
+        Self {
+            beat_count: AtomicUsize::new(1),
+            ticked: AtomicUsize::new(0),
+        }
+    }
+
+    /// Issues the next unique request ID.
+    pub fn next_beat(&self) -> Beat {
+        let count = self.beat_count.fetch_add(1, Ordering::AcqRel);
+        assert_ne!(count, usize::MAX, "BeatTicker count overflow");
+        Beat(count)
+    }
+
+    /// Returns the newest beat (request ID).
+    pub fn current_beat(&self) -> Beat {
+        Beat(self.beat_count.load(Ordering::Acquire))
+    }
+
+    /// Marks a beat (request) as successful.
+    pub fn tick(&self, beat: Beat) {
+        self.ticked.fetch_max(beat.0, Ordering::AcqRel);
+    }
+
+    /// Returns the last successful beat (request ID).
+    pub fn ticked(&self) -> Beat {
+        Beat(self.ticked.load(Ordering::Acquire))
+    }
+}
+
+/// A smart pointer to share `BeatTicker` among threads and tasks.
+#[derive(Clone)]
+pub(crate) struct SharedBeatTicker(Arc<BeatTicker>);
+
+impl SharedBeatTicker {
+    pub fn create() -> Self {
+        Self(Arc::new(BeatTicker::create()))
+    }
+}
+
+impl Deref for SharedBeatTicker {
+    type Target = BeatTicker;
+
+    fn deref(&self) -> &Self::Target {
+        self.0.deref()
+    }
+}

+ 1 - 0
src/daemon_env.rs

@@ -41,6 +41,7 @@ pub(crate) enum Daemon {
     ElectionTimer,
     SyncLogEntries,
     ApplyCommand,
+    VerifyAuthority,
 }
 
 #[derive(Debug, Default)]

+ 25 - 2
src/election.rs

@@ -8,7 +8,10 @@ use rand::{thread_rng, Rng};
 use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
-use crate::{Peer, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term};
+use crate::verify_authority::VerifyAuthorityDaemon;
+use crate::{
+    Peer, Persister, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term,
+};
 
 #[derive(Default)]
 pub(crate) struct ElectionState {
@@ -69,7 +72,7 @@ impl ElectionState {
 // 3. serialize: they are converted to bytes to persist.
 impl<Command> Raft<Command>
 where
-    Command: 'static + Clone + Send + serde::Serialize,
+    Command: 'static + Clone + Default + Send + serde::Serialize,
 {
     /// Runs the election timer daemon that triggers elections.
     ///
@@ -277,6 +280,8 @@ where
             rx,
             self.election.clone(),
             self.new_log_entry.clone().unwrap(),
+            self.verify_authority_daemon.clone(),
+            self.persister.clone(),
         ));
         Some(tx)
     }
@@ -302,6 +307,7 @@ where
         None
     }
 
+    #[allow(clippy::too_many_arguments)]
     async fn count_vote_util_cancelled(
         me: Peer,
         term: Term,
@@ -310,6 +316,8 @@ where
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
         new_log_entry: SharedSender<Option<Peer>>,
+        verify_authority_daemon: VerifyAuthorityDaemon,
+        persister: Arc<dyn Persister>,
     ) {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;
@@ -363,6 +371,21 @@ where
             for item in rf.current_step.iter_mut() {
                 *item = 0;
             }
+            // Reset the verify authority daemon before sending heartbeats to
+            // followers. This is critical to the correctness of verifying
+            // authority.
+            // No verity authority request can go through before the reset is
+            // done, since we are holding the raft lock.
+            verify_authority_daemon.reset_state(term);
+
+            if rf.commit_index != rf.log.last_index_term().index {
+                rf.sentinel_commit_index =
+                    rf.log.add_command(term, Default::default());
+                persister.save_state(rf.persisted_state().into());
+            } else {
+                rf.sentinel_commit_index = rf.commit_index;
+            }
+
             // Sync all logs now.
             let _ = new_log_entry.send(None);
         }

+ 36 - 2
src/heartbeats.rs

@@ -5,8 +5,25 @@ use parking_lot::Mutex;
 
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
+use crate::verify_authority::DaemonBeatTicker;
 use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
+#[derive(Clone)]
+pub(crate) struct HeartbeatsDaemon {
+    sender: tokio::sync::broadcast::Sender<()>,
+}
+
+impl HeartbeatsDaemon {
+    pub fn create() -> Self {
+        let (sender, _) = tokio::sync::broadcast::channel(1);
+        Self { sender }
+    }
+
+    pub fn trigger(&self) {
+        let _ = self.sender.send(());
+    }
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -34,6 +51,11 @@ where
                 let rf = self.inner_state.clone();
                 // A function that updates term with responses to heartbeats.
                 let term_marker = self.term_marker();
+                // A function that casts an "authoritative" vote with Ok()
+                // responses to heartbeats.
+                let beat_ticker = self.beat_ticker(peer_index);
+                // A on-demand trigger to sending a heartbeat.
+                let mut trigger = self.heartbeats_daemon.sender.subscribe();
                 // RPC client must be cloned into the outer async function.
                 let rpc_client = rpc_client.clone();
                 // Shutdown signal.
@@ -41,12 +63,17 @@ where
                 self.thread_pool.spawn(async move {
                     let mut interval = tokio::time::interval(interval);
                     while keep_running.load(Ordering::SeqCst) {
-                        interval.tick().await;
+                        let tick = interval.tick();
+                        let trigger = trigger.recv();
+                        futures_util::pin_mut!(tick, trigger);
+                        let _ =
+                            futures_util::future::select(tick, trigger).await;
                         if let Some(args) = Self::build_heartbeat(&rf) {
                             tokio::spawn(Self::send_heartbeat(
                                 rpc_client.clone(),
                                 args,
                                 term_marker.clone(),
+                                beat_ticker.clone(),
                             ));
                         }
                     }
@@ -87,7 +114,10 @@ where
         rpc_client: impl RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
         term_watermark: TermMarker<Command>,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<()> {
+        let term = args.term;
+        let beat = beat_ticker.next_beat();
         // Passing a reference that is moved to the following closure.
         //
         // It won't work if the rpc_client of type Arc is moved into the closure
@@ -108,7 +138,11 @@ where
                 rpc_client.append_entries(args.clone())
             })
             .await?;
-        term_watermark.mark(response.term);
+        if term == response.term {
+            beat_ticker.tick(beat);
+        } else {
+            term_watermark.mark(response.term);
+        }
         Ok(())
     }
 }

+ 12 - 0
src/lib.rs

@@ -11,6 +11,7 @@ use crate::apply_command::ApplyCommandFnMut;
 pub use crate::apply_command::ApplyCommandMessage;
 use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::election::ElectionState;
+use crate::heartbeats::HeartbeatsDaemon;
 use crate::index_term::IndexTerm;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
@@ -19,8 +20,11 @@ pub(crate) use crate::raft_state::State;
 pub use crate::remote_raft::RemoteRaft;
 pub use crate::snapshot::Snapshot;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::verify_authority::VerifyAuthorityDaemon;
+pub use crate::verify_authority::VerifyAuthorityResult;
 
 mod apply_command;
+mod beat_ticker;
 mod daemon_env;
 mod election;
 mod heartbeats;
@@ -36,6 +40,7 @@ mod snapshot;
 mod sync_log_entries;
 mod term_marker;
 pub mod utils;
+mod verify_authority;
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -67,6 +72,8 @@ pub struct Raft<Command> {
     keep_running: Arc<AtomicBool>,
     election: Arc<ElectionState>,
     snapshot_daemon: SnapshotDaemon,
+    verify_authority_daemon: VerifyAuthorityDaemon,
+    heartbeats_daemon: HeartbeatsDaemon,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 
@@ -206,6 +213,8 @@ where
             keep_running: Arc::new(Default::default()),
             election: Arc::new(election),
             snapshot_daemon: Default::default(),
+            verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
+            heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: Arc::new(thread_pool),
             daemon_env,
             stop_wait_group: WaitGroup::new(),
@@ -213,6 +222,8 @@ where
 
         this.keep_running.store(true, Ordering::SeqCst);
         // Running in a standalone thread.
+        this.run_verify_authority_daemon();
+        // Running in a standalone thread.
         this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         // Running in a standalone thread.
         this.run_log_entry_daemon();
@@ -269,6 +280,7 @@ where
         self.new_log_entry.take().map(|n| n.send(None));
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
+        self.verify_authority_daemon.kill();
         // We cannot easily combine stop_wait_group into DaemonEnv because of
         // shutdown dependencies. The thread pool is not managed by DaemonEnv,
         // but it cannot be shutdown until all daemons are. On the other hand

+ 4 - 0
src/raft_state.rs

@@ -25,6 +25,9 @@ pub(crate) struct RaftState<Command> {
     pub state: State,
 
     pub leader_id: Peer,
+
+    // Index of the first commit of each term as the leader.
+    pub sentinel_commit_index: Index,
 }
 
 impl<Command: Default> RaftState<Command> {
@@ -40,6 +43,7 @@ impl<Command: Default> RaftState<Command> {
             current_step: vec![0; peer_size],
             state: State::Follower,
             leader_id: me,
+            sentinel_commit_index: 0,
         }
     }
 }

+ 14 - 2
src/sync_log_entries.rs

@@ -9,6 +9,7 @@ use crate::daemon_env::{Daemon, ErrorKind};
 use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::verify_authority::DaemonBeatTicker;
 use crate::{
     AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
     Term, HEARTBEAT_INTERVAL_MILLIS,
@@ -101,6 +102,7 @@ where
                                 openings[i].0.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
+                                this.beat_ticker(i),
                                 TaskNumber(task_number),
                             ));
                         }
@@ -168,6 +170,7 @@ where
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
+        beat_ticker: DaemonBeatTicker,
         task_number: TaskNumber,
     ) {
         if opening.swap(0, Ordering::SeqCst) == 0 {
@@ -181,7 +184,8 @@ where
                 let term = args.term;
                 let prev_log_index = args.prev_log_index;
                 let match_index = args.prev_log_index + args.entries.len();
-                let succeeded = Self::append_entries(&rpc_client, args).await;
+                let succeeded =
+                    Self::append_entries(&rpc_client, args, beat_ticker).await;
 
                 (term, prev_log_index, match_index, succeeded)
             }
@@ -189,7 +193,9 @@ where
                 let term = args.term;
                 let prev_log_index = args.last_included_index;
                 let match_index = args.last_included_index;
-                let succeeded = Self::install_snapshot(&rpc_client, args).await;
+                let succeeded =
+                    Self::install_snapshot(&rpc_client, args, beat_ticker)
+                        .await;
 
                 (term, prev_log_index, match_index, succeeded)
             }
@@ -452,8 +458,10 @@ where
     async fn append_entries(
         rpc_client: &dyn RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
+        let beat = beat_ticker.next_beat();
         let reply = retry_rpc(
             Self::APPEND_ENTRIES_RETRY,
             RPC_DEADLINE,
@@ -461,6 +469,7 @@ where
         )
         .await?;
         Ok(if reply.term == term {
+            beat_ticker.tick(beat);
             if let Some(committed) = reply.committed {
                 if reply.success {
                     SyncLogEntriesResult::Archived(committed)
@@ -492,8 +501,10 @@ where
     async fn install_snapshot(
         rpc_client: &dyn RemoteRaft<Command>,
         args: InstallSnapshotArgs,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
+        let beat = beat_ticker.next_beat();
         let reply = retry_rpc(
             Self::INSTALL_SNAPSHOT_RETRY,
             RPC_DEADLINE,
@@ -501,6 +512,7 @@ where
         )
         .await?;
         Ok(if reply.term == term {
+            beat_ticker.tick(beat);
             if let Some(committed) = reply.committed {
                 SyncLogEntriesResult::Archived(committed)
             } else {

+ 453 - 0
src/verify_authority.rs

@@ -0,0 +1,453 @@
+use crate::beat_ticker::{Beat, SharedBeatTicker};
+use crate::daemon_env::Daemon;
+use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
+use parking_lot::{Condvar, Mutex};
+use std::collections::VecDeque;
+use std::future::Future;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+/// The result returned to a verify authority request.
+/// This request is not directly exposed to end users. Instead it is used
+/// internally to implement no-commit read-only requests.
+#[derive(Debug)]
+pub enum VerifyAuthorityResult {
+    Success(Index),
+    TermElapsed,
+    TimedOut,
+}
+
+/// Token stored in the internal queue for authority verification. Each token
+/// represents one verification request.
+#[derive(Debug)]
+struct VerifyAuthorityToken {
+    commit_index: Index,
+    beats_moment: Vec<Beat>,
+    rough_time: Instant,
+    sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
+}
+
+#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialOrd, PartialEq)]
+struct QueueIndex(usize);
+
+/// The state of this daemon, should bee protected by a mutex.
+struct VerifyAuthorityState {
+    /// The current term. Might be behind the real term in the cluster.
+    term: Term,
+    /// Pending requests to verify authority.
+    queue: VecDeque<VerifyAuthorityToken>,
+    /// Number of requests that have been processed.
+    start: QueueIndex,
+    /// A vector of queue indexes. Each element in this vector indicates the
+    /// index of the first request that has not been confirmed by the
+    /// corresponding peer.
+    /// These indexes include all processed requests. They will never go down.
+    covered: Vec<QueueIndex>,
+}
+
+impl VerifyAuthorityState {
+    pub fn create(peer_count: usize) -> Self {
+        VerifyAuthorityState {
+            term: Term(0),
+            queue: Default::default(),
+            start: QueueIndex(0),
+            covered: vec![QueueIndex(0); peer_count],
+        }
+    }
+
+    pub fn reset(&mut self, term: Term) {
+        self.clear_tickets();
+
+        self.term = term;
+        self.start = QueueIndex(0);
+        for item in self.covered.iter_mut() {
+            *item = QueueIndex(0)
+        }
+    }
+
+    pub fn clear_tickets(&mut self) {
+        for token in self.queue.drain(..) {
+            let _ = token.sender.send(VerifyAuthorityResult::TermElapsed);
+        }
+    }
+}
+
+#[derive(Clone)]
+pub(crate) struct DaemonBeatTicker {
+    beat_ticker: SharedBeatTicker,
+    condvar: Arc<Condvar>,
+}
+
+impl DaemonBeatTicker {
+    pub fn next_beat(&self) -> Beat {
+        let beat = self.beat_ticker.next_beat();
+        beat
+    }
+
+    pub fn tick(&self, beat: Beat) {
+        self.beat_ticker.tick(beat);
+        self.condvar.notify_one();
+    }
+}
+
+#[derive(Clone)]
+pub(crate) struct VerifyAuthorityDaemon {
+    state: Arc<Mutex<VerifyAuthorityState>>,
+    beat_tickers: Vec<SharedBeatTicker>,
+    condvar: Arc<Condvar>,
+}
+
+impl VerifyAuthorityDaemon {
+    pub fn create(peer_count: usize) -> Self {
+        Self {
+            state: Arc::new(Mutex::new(VerifyAuthorityState::create(
+                peer_count,
+            ))),
+            beat_tickers: (0..peer_count)
+                .map(|_| SharedBeatTicker::create())
+                .collect(),
+            condvar: Arc::new(Condvar::new()),
+        }
+    }
+
+    pub fn wait_for(&self, timeout: Duration) {
+        let mut guard = self.state.lock();
+        self.condvar.wait_for(&mut guard, timeout);
+    }
+
+    pub fn reset_state(&self, term: Term) {
+        self.state.lock().reset(term);
+        // Increase all beats by one to make sure upcoming verify authority
+        // requests wait for beats in the current term. This in fact creates
+        // phantom beats that will never be marked as completed by themselves.
+        // They will be automatically `ticked()` when newer (real) beats are
+        // created, sent and `ticked()`.
+        for beat_ticker in self.beat_tickers.iter() {
+            beat_ticker.next_beat();
+        }
+    }
+
+    /// Enqueues a verify authority request. Returns a receiver of the
+    /// verification result. Returns None if the term has passed.
+    pub fn verify_authority_async(
+        &self,
+        current_term: Term,
+        commit_index: Index,
+    ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
+        let mut state = self.state.lock();
+        // The inflight beats are sent at least for `current_term`. This is
+        // guaranteed by the fact that we immediately increase beats for all
+        // peers after being elected, before releasing the "elected" message to
+        // the rest of the Raft system. The newest beats we get here are at
+        // least as new as the phantom beats created by `Self::reset_state()`.
+        let beats_moment = self
+            .beat_tickers
+            .iter()
+            .map(|beat_ticker| beat_ticker.current_beat())
+            .collect();
+
+        // The inflight beats could also be for any term after `current_term`.
+        // We must check if the term stored in the daemon is the same as
+        // `current_term`.
+        // `state.term` could be smaller than `current_term`, if a new term is
+        // started by someone else and we lost leadership.
+        // `state.term` could be greater than `current_term`, if we lost
+        // leadership but are elected leader again in a following term.
+        // In both cases, we cannot confirm the leadership at `current_term`.
+        if state.term != current_term {
+            return None;
+        }
+
+        let (sender, receiver) = tokio::sync::oneshot::channel();
+        let token = VerifyAuthorityToken {
+            commit_index,
+            beats_moment,
+            rough_time: Instant::now(),
+            sender,
+        };
+        state.queue.push_back(token);
+
+        Some(receiver)
+    }
+
+    /// Run one iteration of the verify authority daemon.
+    pub fn run_verify_authority_iteration(
+        &self,
+        current_term: Term,
+        commit_index: Index,
+        sentinel_commit_index: Index,
+    ) {
+        // Opportunistic check: do nothing if we don't have any requests.
+        if self.state.lock().queue.is_empty() {
+            return;
+        }
+
+        self.clear_committed_requests(current_term, commit_index);
+        // Do not use ticks to clear requests if we have not committed at least
+        // one log entry since the start of the term. At the start of the term,
+        // the leader might not know the commit index of the previous leader.
+        // This holds true even it is guaranteed that all entries committed by
+        // the previous leader will be committed by the current leader.
+        if commit_index >= sentinel_commit_index {
+            self.clear_ticked_requests();
+        }
+        self.removed_expired_requests(current_term);
+    }
+
+    /// Clears all requests that have seen at least one commit.
+    /// This function handles the following scenario: a verify authority request
+    /// was received, when the `commit_index` was at C. Later as the leader we
+    /// moved the commit index to at least C+1. That implies that when the
+    /// request was first received, no other new commits after C could have been
+    /// added to the log, either by this replica or others. It then follows that
+    /// we can claim we had authority at that point.
+    fn clear_committed_requests(
+        &self,
+        current_term: Term,
+        commit_index: Index,
+    ) {
+        let mut state = self.state.lock();
+        // We might skip some requests that could have been cleared, if we did
+        // not react to the commit notification fast enough, and missed a
+        // commit. This is about the case where in the last iteration
+        // `commit_index` was `ci`, but in this iteration it becomes `ci + 2`
+        // (or even larger), skipping `ci + 1`.
+        //
+        // Obviously skipping a commit is a problem if `ci + 2` and `ci + 1` are
+        // both committed by us in this term. The requests that are cleared by
+        // `+1` will be cleared by `+2` anyway. Similarly it is not a problem if
+        // neither are committed by us in this term, since `+1` will not clear
+        // any requests.
+        //
+        // If `+2` is not committed by us, but `+1` is, we lose the opportunity
+        // to use `+1` to clear requests. The chances of losing this opportunity
+        // are slim, because between `+1` and `+2`, there has to be a missed
+        // heartbeat interval, and a new commit (`+2`) from another leader. We
+        // have plenty of time to run this method before `+2` reaches us.
+        //
+        // Overall it is acceptable to simplify the implementation and risk
+        // losing the mentioned opportunity.
+        if current_term != state.term {
+            return;
+        }
+
+        // Note the commit_index in the queue might not be in increasing order.
+        // We could still have requests that have a smaller commit_index after
+        // this sweep. That is an acceptable tradeoff we are taking.
+        while let Some(head) = state.queue.pop_front() {
+            if head.commit_index >= commit_index {
+                state.queue.push_front(head);
+                break;
+            }
+            // At the start of the term, the previous leader might have exposed
+            // all entries before the sentinel commit to clients. If a request
+            // arrived before the sentinel commit is committed, its commit index
+            // (token.commit_index) might be inaccurate. Thus we cannot allow
+            // the client to return any state before the sentinel index.
+            //
+            // We did not choose the sentinel index but opted for a more strict
+            // commit index, because the index is committed anyway. It should be
+            // delivered to the application really quickly. We paid the price
+            // with latency but made the request more fresh.
+            let _ = head
+                .sender
+                .send(VerifyAuthorityResult::Success(commit_index));
+            state.start.0 += 1;
+        }
+    }
+
+    /// Fetches the newest successful RPC response from peers, and mark verify
+    /// authority requests as complete if they are covered by more than half of
+    /// the replicas.
+    fn clear_ticked_requests(&self) {
+        for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
+            // Fetches the newest successful RPC response from the current peer.
+            let ticked = beat_ticker.ticked();
+            let mut state = self.state.lock();
+            // Update progress with `ticked`. All requests that came before
+            // `ticked` now have one more votes of leader authority from the
+            // current peer.
+            let first_not_ticked_index = state.queue.partition_point(|token| {
+                token.beats_moment[peer_index] <= ticked
+            });
+            let new_covered = first_not_ticked_index + state.start.0;
+            assert!(new_covered >= state.covered[peer_index].0);
+            state.covered[peer_index].0 = new_covered;
+
+            // Count the requests that has more than N / 2 votes. We always have
+            // the vote from ourselves, but the value is 0 in `covered` array.
+            let mut sorted_covered = state.covered.to_owned();
+            sorted_covered.sort_unstable();
+            let mid = sorted_covered.len() / 2 + 1;
+            let new_start = sorted_covered[mid];
+
+            // `state.start` could have been moved by other means, e.g. by a
+            // subsequent commit of the same term after the beat is issued.
+            // Then the relevant verify authority requests have been processed.
+            // If all ticked requests have been processed, nothing needs to be
+            // done. Skip to the next iteration.
+            if new_start <= state.start {
+                continue;
+            }
+
+            // All requests before `new_start` is now verified.
+            let verified = new_start.0 - state.start.0;
+            for token in state.queue.drain(..verified) {
+                let mut cnt = 0;
+                for (index, beat) in token.beats_moment.iter().enumerate() {
+                    if self.beat_tickers[index].ticked() >= *beat {
+                        cnt += 1;
+                    }
+                }
+                assert!(cnt + cnt + 1 >= self.beat_tickers.len());
+                let _ = token
+                    .sender
+                    .send(VerifyAuthorityResult::Success(token.commit_index));
+            }
+            // Move the queue starting point.
+            state.start = new_start;
+        }
+    }
+
+    const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
+        Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 2);
+
+    /// Remove expired requests if we are no longer the leader.
+    /// If we have lost leadership, we are unlikely to receive confirmations
+    /// of past leadership state from peers. Requests are expired after two
+    /// heartbeat period have passed. We do not immediately cancel all incoming
+    /// requests, in hope that we could still answer them accurately without
+    /// breaking the consistency guarantee.
+    fn removed_expired_requests(&self, current_term: Term) {
+        let mut state = self.state.lock();
+        // Return if we are still the leader, or we become the leader again.
+        //
+        // Note that we do not hold the main raft state lock, thus the value of
+        // `current_term` might not be up-to-date. We only update `state.term`
+        // after an election. If in a term after `current_term`, we are elected
+        // leader again, `state.term` could be updated and thus greater than the
+        // (now stale) `current_term`. In that case, the queue should have been
+        // reset. There will be no expired request to remove.
+        if state.term >= current_term {
+            return;
+        }
+
+        let expiring_line =
+            Instant::now() - Self::VERIFY_AUTHORITY_REQUEST_EXPIRATION;
+        // Assuming bounded clock skew, otherwise we will lose efficiency.
+        let expired =
+            |head: &VerifyAuthorityToken| head.rough_time < expiring_line;
+        // Note rough_time might not be in increasing order, so we might still
+        // have requests that are expired in the queue after the sweep.
+        while state.queue.front().map_or(false, expired) {
+            state
+                .queue
+                .pop_front()
+                .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
+            state.start.0 += 1;
+        }
+    }
+
+    pub fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
+        DaemonBeatTicker {
+            beat_ticker: self.beat_tickers[peer_index].clone(),
+            condvar: self.condvar.clone(),
+        }
+    }
+
+    pub fn kill(&self) {
+        let term = self.state.lock().term;
+        // Fail all inflight verify authority requests. It is important to do
+        // this so that the RPC framework could drop requests served by us and
+        // release all references to the Raft instance.
+        self.reset_state(term);
+        self.condvar.notify_all();
+    }
+}
+
+impl<Command: 'static + Send> Raft<Command> {
+    const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
+
+    /// Create a thread and runs the verify authority daemon.
+    pub(crate) fn run_verify_authority_daemon(&self) {
+        let me = self.me.clone();
+        let keep_running = self.keep_running.clone();
+        let daemon_env = self.daemon_env.clone();
+        let this_daemon = self.verify_authority_daemon.clone();
+        let rf = self.inner_state.clone();
+        let stop_wait_group = self.stop_wait_group.clone();
+
+        let join_handle = std::thread::spawn(move || {
+            // Note: do not change this to `let _ = ...`.
+            let _guard = daemon_env.for_scope();
+
+            log::info!("{:?} verify authority daemon running ...", me);
+            while keep_running.load(Ordering::Acquire) {
+                this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
+                let (current_term, commit_index, sentinel) = {
+                    let rf = rf.lock();
+                    (rf.current_term, rf.commit_index, rf.sentinel_commit_index)
+                };
+                this_daemon.run_verify_authority_iteration(
+                    current_term,
+                    commit_index,
+                    sentinel,
+                );
+            }
+            log::info!("{:?} verify authority daemon done.", me);
+
+            drop(stop_wait_group);
+        });
+        self.daemon_env
+            .watch_daemon(Daemon::VerifyAuthority, join_handle);
+    }
+
+    /// Create a verify authority request. Returns None if we are not the
+    /// leader.
+    ///
+    /// A successful verification allows the application to respond to read-only
+    /// requests that arrived before this function is called. The answer must
+    /// include all commands at or before a certain index, which is returned to
+    /// the application with the successful verification result. The index is
+    /// in fact the commit index at the moment this function was called. It is
+    /// guaranteed that no other commands could possibly have been committed at
+    /// the moment this function was called.
+    ///
+    /// The application is also free to include any subsequent commits in the
+    /// response. Consistency is still guaranteed, because Raft never rolls back
+    /// committed commands.
+    pub fn verify_authority_async(
+        &self,
+    ) -> Option<impl Future<Output = crate::VerifyAuthorityResult>> {
+        // Fail the request if we have been killed.
+        if !self.keep_running.load(Ordering::Acquire) {
+            return None;
+        }
+
+        let (term, commit_index) = {
+            let rf = self.inner_state.lock();
+            if !rf.is_leader() {
+                // Returning none instead of `Pending::Ready(TermElapsed)`,
+                // because that requires a separate struct that implements
+                // Future, which is tedious to write.
+                return None;
+            }
+
+            (rf.current_term, rf.commit_index)
+        };
+        let receiver = self
+            .verify_authority_daemon
+            .verify_authority_async(term, commit_index);
+        self.heartbeats_daemon.trigger();
+        receiver.map(|receiver| async move {
+            receiver
+                .await
+                .expect("Verify authority daemon never drops senders")
+        })
+    }
+
+    pub(crate) fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
+        self.verify_authority_daemon.beat_ticker(peer_index)
+    }
+}