Parcourir la source

Implement the prevote phase on both client and server side.

Jing Yang il y a 3 ans
Parent
commit
16c2187ab3
4 fichiers modifiés avec 117 ajouts et 45 suppressions
  1. 103 45
      src/election.rs
  2. 1 0
      src/messages.rs
  3. 12 0
      src/process_request_vote.rs
  4. 1 0
      src/utils/integration_test.rs

+ 103 - 45
src/election.rs

@@ -10,8 +10,8 @@ use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    Peer, Persister, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
-    State, Term,
+    IndexTerm, Peer, Persister, Raft, RaftState, ReplicableCommand,
+    RequestVoteArgs, State, Term,
 };
 
 struct VersionedDeadline {
@@ -77,12 +77,20 @@ impl ElectionState {
 /// to become the leader.
 struct ElectionCandidate<Command> {
     me: Peer,
-    term: Term,
+    peers: Vec<Peer>,
     rf: Arc<Mutex<RaftState<Command>>>,
     election: Arc<ElectionState>,
     new_log_entry: SyncLogEntriesComms,
     verify_authority_daemon: VerifyAuthorityDaemon,
     persister: Arc<dyn Persister>,
+    thread_pool: tokio::runtime::Handle,
+}
+
+/// Result of a vote. Returns the cancellation token if it is not cancelled.
+enum QuorumOrCancelled {
+    Accepted(futures_channel::oneshot::Receiver<()>),
+    Rejected,
+    Cancelled,
 }
 
 // Command must be
@@ -146,7 +154,13 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let mut should_run = None;
         while self.keep_running.load(Ordering::Relaxed) {
             let mut cancel_handle = should_run.and_then(|last_timer_count| {
-                self.run_election(last_timer_count)
+                // Election can only be run when we are **not** holding the
+                // election timer lock. The reason is that scheduling a next
+                // election requires the rf lock. We always acquire the rf lock
+                // before the election timer lock in every other place. Here we
+                // must follow the same pattern to avoid deadlock: dropping the
+                // election timer lock then acquiring the two locks in order.
+                self.schedule_election(last_timer_count)
             });
 
             let mut guard = election.timer.lock();
@@ -227,13 +241,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
         log::info!("{:?} election timer daemon done.", self.me);
     }
 
-    fn run_election(
+    fn schedule_election(
         &self,
         timer_count: usize,
     ) -> Option<futures_channel::oneshot::Sender<()>> {
-        let me = self.me;
-        let (term, args) = {
-            let mut rf = self.inner_state.lock();
+        let (term, last_log_index) = {
+            let rf = self.inner_state.lock();
 
             // The previous election is faster and reached the critical section
             // before us. We should stop and not run this election.
@@ -242,48 +255,86 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 return None;
             }
 
-            rf.current_term.0 += 1;
-
-            rf.voted_for = Some(me);
-            rf.state = State::Candidate;
-
-            self.persister.save_state(rf.persisted_state().into());
-
-            let term = rf.current_term;
-            let (last_log_index, last_log_term) =
-                rf.log.last_index_term().unpack();
-
-            (
-                term,
-                RequestVoteArgs {
-                    term,
-                    candidate_id: me,
-                    last_log_index,
-                    last_log_term,
-                },
-            )
+            (rf.current_term, rf.log.last_index_term())
         };
 
-        let votes = self.spawn_request_votes(args);
         let (tx, rx) = futures_channel::oneshot::channel();
         let candidate = ElectionCandidate {
             me: self.me,
-            term,
+            peers: self.peers.clone(),
             rf: self.inner_state.clone(),
             election: self.election.clone(),
             new_log_entry: self.sync_log_entries_comms.clone(),
             verify_authority_daemon: self.verify_authority_daemon.clone(),
             persister: self.persister.clone(),
+            thread_pool: self.thread_pool.clone(),
+        };
+        self.thread_pool.spawn(Self::run_election(
+            term,
+            candidate,
+            last_log_index,
+            rx,
+        ));
+        Some(tx)
+    }
+
+    async fn run_election(
+        term: Term,
+        candidate: ElectionCandidate<Command>,
+        last_index_term: IndexTerm,
+        cancel_token: futures_channel::oneshot::Receiver<()>,
+    ) {
+        let me = candidate.me;
+        let prevote_args = RequestVoteArgs {
+            term,
+            candidate_id: me,
+            last_log_index: last_index_term.index,
+            last_log_term: last_index_term.term,
+            prevote: true,
         };
 
-        self.thread_pool.spawn(async move {
-            if !Self::quorum_before_cancelled(votes, rx).await {
+        // Run the prevote phase first.
+        let prevotes = Self::spawn_request_votes(&candidate, prevote_args);
+        let prevote_results =
+            Self::quorum_before_cancelled(prevotes, cancel_token).await;
+        let cancel_token = match prevote_results {
+            QuorumOrCancelled::Accepted(cancel_token) => cancel_token,
+            // Did not get quorum on a prevote. Skip the rest.
+            _ => return,
+        };
+
+        // Advance to the next term.
+        let (next_term, (last_log_index, last_log_term)) = {
+            let mut rf = candidate.rf.lock();
+
+            if term != rf.current_term {
                 return;
             }
 
-            Self::become_leader(candidate);
-        });
-        Some(tx)
+            rf.current_term.0 += 1;
+
+            rf.voted_for = Some(me);
+            rf.state = State::Candidate;
+
+            candidate.persister.save_state(rf.persisted_state().into());
+
+            (rf.current_term, rf.log.last_index_term().unpack())
+        };
+
+        let args = RequestVoteArgs {
+            term: next_term,
+            candidate_id: me,
+            last_log_index,
+            last_log_term,
+            prevote: false,
+        };
+
+        let votes = Self::spawn_request_votes(&candidate, args);
+        if let QuorumOrCancelled::Accepted(_) =
+            Self::quorum_before_cancelled(votes, cancel_token).await
+        {
+            Self::become_leader(next_term, candidate);
+        }
     }
 
     const REQUEST_VOTE_RETRY: usize = 1;
@@ -304,13 +355,13 @@ impl<Command: ReplicableCommand> Raft<Command> {
     }
 
     fn spawn_request_votes(
-        &self,
+        candidate: &ElectionCandidate<Command>,
         args: RequestVoteArgs,
     ) -> Vec<tokio::task::JoinHandle<Option<bool>>> {
         let mut votes = vec![];
-        for peer in self.peers.clone().into_iter() {
-            if peer != self.me {
-                let one_vote = self
+        for peer in candidate.peers.clone().into_iter() {
+            if peer != candidate.me {
+                let one_vote = candidate
                     .thread_pool
                     .spawn(Self::request_vote(peer, args.clone()));
                 votes.push(one_vote);
@@ -322,7 +373,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     async fn quorum_before_cancelled(
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
-    ) -> bool {
+    ) -> QuorumOrCancelled {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;
         let mut against_count = 0;
@@ -339,7 +390,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
             )
             .await;
             let ((one_vote, _, rest), new_token) = match selected {
-                futures_util::future::Either::Left(_) => break,
+                futures_util::future::Either::Left(_) => {
+                    return QuorumOrCancelled::Cancelled
+                }
                 futures_util::future::Either::Right(tuple) => tuple,
             };
 
@@ -355,18 +408,23 @@ impl<Command: ReplicableCommand> Raft<Command> {
             }
         }
 
-        return vote_count >= quorum;
+        return if vote_count < quorum {
+            QuorumOrCancelled::Rejected
+        } else {
+            QuorumOrCancelled::Accepted(cancel_token)
+        };
     }
 
-    fn become_leader(this: ElectionCandidate<Command>) {
-        let term = this.term;
+    fn become_leader(term: Term, this: ElectionCandidate<Command>) {
+        let me = this.me;
+        log::info!("{me:?} voted as leader at term {term:?}");
         let mut rf = this.rf.lock();
         if rf.current_term == term && rf.state == State::Candidate {
             // We are the leader now. The election timer can be stopped.
             this.election.stop_election_timer();
 
             rf.state = State::Leader;
-            rf.leader_id = this.me;
+            rf.leader_id = me;
             rf.match_index.fill(0);
 
             let sentinel_commit_index;

+ 1 - 0
src/messages.rs

@@ -10,6 +10,7 @@ pub struct RequestVoteArgs {
     pub(crate) candidate_id: Peer,
     pub(crate) last_log_index: Index,
     pub(crate) last_log_term: Term,
+    pub(crate) prevote: bool,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]

+ 12 - 0
src/process_request_vote.rs

@@ -14,6 +14,18 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
         let mut rf = self.inner_state.lock();
 
         let term = rf.current_term;
+
+        if args.prevote {
+            let last_log = rf.log.last_index_term();
+            let longer_log = args.last_log_term > last_log.term
+                || (args.last_log_term == last_log.term
+                    && args.last_log_index >= last_log.index);
+            return RequestVoteReply {
+                term: args.term,
+                vote_granted: args.term >= term && longer_log,
+            };
+        }
+
         #[allow(clippy::comparison_chain)]
         if args.term < term {
             return RequestVoteReply {

+ 1 - 0
src/utils/integration_test.rs

@@ -16,6 +16,7 @@ pub fn make_request_vote_args(
         candidate_id: Peer(peer_id),
         last_log_index,
         last_log_term,
+        prevote: false,
     }
 }