Ver Fonte

Add active-step-down to guarantee liveness.

This is only useful in specific cases, where a follower would refuse
to vote for a new candidate while the present leader is still
functioning.

If a follower simply follow the orginal rule of voting for the largest
term, this change would not be needed.
Jing Yang há 3 anos atrás
pai
commit
dae0a22fec
4 ficheiros alterados com 84 adições e 13 exclusões
  1. 57 0
      src/check_quorum.rs
  2. 1 0
      src/lib.rs
  3. 3 1
      src/raft.rs
  4. 23 12
      src/verify_authority.rs

+ 57 - 0
src/check_quorum.rs

@@ -0,0 +1,57 @@
+use std::sync::atomic::Ordering;
+use std::time::Duration;
+
+use crate::{Raft, ReplicableCommand};
+
+impl<Command: ReplicableCommand> Raft<Command> {
+    pub fn schedule_check_quorum(&self, interval: Duration) {
+        let me = self.me;
+        let keep_running = self.keep_running.clone();
+        let rf = self.inner_state.clone();
+        let election = self.election.clone();
+        let persister = self.persister.clone();
+
+        let verify_authority_daemon = self.verify_authority_daemon.clone();
+        let heartbeats_daemon = self.heartbeats_daemon.clone();
+
+        self.thread_pool.spawn(async move {
+            let mut interval = tokio::time::interval(interval);
+
+            while keep_running.load(Ordering::Relaxed) {
+                let (is_leader, term) = {
+                    let rf = rf.lock();
+                    (rf.is_leader(), rf.current_term)
+                };
+
+                if !is_leader {
+                    // Skip the rest of the loop if we are not the leader.
+                    interval.tick().await;
+                    continue;
+                }
+
+                // Technically we shouldn't get beats if we are not the leader,
+                // but it does not hurt since we acquired the soft term lock.
+                let beats_moment = verify_authority_daemon.beats_moment();
+
+                heartbeats_daemon.trigger(false);
+                interval.tick().await;
+
+                // If we had authority in the past, that means we have not lost
+                // contact with followers. Keep going.
+                if verify_authority_daemon.verify_beats_moment(beats_moment) {
+                    continue;
+                }
+
+                let mut rf = rf.lock();
+                // Only step down if we are still the leader at the same term.
+                if rf.is_leader() && rf.current_term == term {
+                    log::warn!("Leader {me:?} lost quorum, stepping down.");
+
+                    rf.step_down();
+                    election.reset_election_timer();
+                    persister.save_state(rf.persisted_state().into());
+                }
+            }
+        });
+    }
+}

+ 1 - 0
src/lib.rs

@@ -15,6 +15,7 @@ pub(crate) use crate::raft_state::State;
 
 mod apply_command;
 mod beat_ticker;
+mod check_quorum;
 mod daemon_env;
 mod daemon_watch;
 mod election;

+ 3 - 1
src/raft.rs

@@ -173,9 +173,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // Running in a standalone thread.
         let apply_command_daemon = this.run_apply_command_daemon(apply_command);
         daemon_watch.create_daemon(Daemon::ApplyCommand, apply_command_daemon);
-        // One off function that schedules many little tasks, running on the
+        // One off functions that schedule many little tasks, running on the
         // internal thread pool.
         this.schedule_heartbeats(HEARTBEAT_INTERVAL);
+        this.schedule_check_quorum(HEARTBEAT_INTERVAL * 2);
+
         // The last step is to start running election timer.
         daemon_watch.create_daemon(Daemon::ElectionTimer, {
             let raft = this.clone();

+ 23 - 12
src/verify_authority.rs

@@ -133,6 +133,14 @@ impl VerifyAuthorityDaemon {
         }
     }
 
+    /// Returns a snapshot of current beats as a token to verify authority.
+    pub fn beats_moment(&self) -> Vec<Beat> {
+        self.beat_tickers
+            .iter()
+            .map(|beat_ticker| beat_ticker.current_beat())
+            .collect()
+    }
+
     /// Enqueues a verify authority request. Returns a receiver of the
     /// verification result. Returns None if the term has passed.
     pub fn verify_authority_async(
@@ -146,11 +154,7 @@ impl VerifyAuthorityDaemon {
         // peers after being elected, before releasing the "elected" message to
         // the rest of the Raft system. The newest beats we get here are at
         // least as new as the phantom beats created by `Self::reset_state()`.
-        let beats_moment = self
-            .beat_tickers
-            .iter()
-            .map(|beat_ticker| beat_ticker.current_beat())
-            .collect();
+        let beats_moment = self.beats_moment();
 
         // The inflight beats could also be for any term after `current_term`.
         // We must check if the term stored in the daemon is the same as
@@ -191,6 +195,18 @@ impl VerifyAuthorityDaemon {
         self.remove_expired_requests(current_term);
     }
 
+    /// Verifies that at `beats_moment` we had authority.
+    pub fn verify_beats_moment(&self, beats_moment: Vec<Beat>) -> bool {
+        let mut cnt = 0;
+        for (index, beat) in beats_moment.iter().enumerate() {
+            if self.beat_tickers[index].ticked() >= *beat {
+                cnt += 1;
+            }
+        }
+
+        cnt + cnt + 1 >= self.beat_tickers.len()
+    }
+
     /// Fetches the newest successful RPC response from peers, and mark verify
     /// authority requests as complete if they are covered by more than half of
     /// the replicas.
@@ -243,13 +259,8 @@ impl VerifyAuthorityDaemon {
             let verified = new_start.0 - state.start.0;
             let sentinel_commit_index = state.sentinel_commit_index;
             for token in state.queue.drain(..verified) {
-                let mut cnt = 0;
-                for (index, beat) in token.beats_moment.iter().enumerate() {
-                    if self.beat_tickers[index].ticked() >= *beat {
-                        cnt += 1;
-                    }
-                }
-                assert!(cnt + cnt + 1 >= self.beat_tickers.len());
+                // Double check that we indeed had authority at that moment.
+                assert!(self.verify_beats_moment(token.beats_moment));
 
                 // Never verify authority before the sentinel commit index. The
                 // previous leader might have exposed data up to the commit