Sfoglia il codice sorgente

Add a prevote phase to election: merge branch 'prevote' #3

Jing Yang 3 anni fa
parent
commit
634e631085
4 ha cambiato i file con 154 aggiunte e 61 eliminazioni
  1. 140 61
      src/election.rs
  2. 1 0
      src/messages.rs
  3. 12 0
      src/process_request_vote.rs
  4. 1 0
      src/utils/integration_test.rs

+ 140 - 61
src/election.rs

@@ -10,8 +10,8 @@ use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    Peer, Persister, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
-    State, Term,
+    IndexTerm, Peer, Persister, Raft, RaftState, ReplicableCommand,
+    RequestVoteArgs, State, Term,
 };
 
 struct VersionedDeadline {
@@ -73,6 +73,26 @@ impl ElectionState {
     }
 }
 
+/// A helper struct that stores all the information needed
+/// to become the leader.
+struct ElectionCandidate<Command> {
+    me: Peer,
+    peers: Vec<Peer>,
+    rf: Arc<Mutex<RaftState<Command>>>,
+    election: Arc<ElectionState>,
+    new_log_entry: SyncLogEntriesComms,
+    verify_authority_daemon: VerifyAuthorityDaemon,
+    persister: Arc<dyn Persister>,
+    thread_pool: tokio::runtime::Handle,
+}
+
+/// Result of a vote. Returns the cancellation token if it is not cancelled.
+enum QuorumOrCancelled {
+    Accepted(futures_channel::oneshot::Receiver<()>),
+    Rejected,
+    Cancelled,
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -134,7 +154,13 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let mut should_run = None;
         while self.keep_running.load(Ordering::Relaxed) {
             let mut cancel_handle = should_run.and_then(|last_timer_count| {
-                self.run_election(last_timer_count)
+                // Election can only be run when we are **not** holding the
+                // election timer lock. The reason is that scheduling a next
+                // election requires the rf lock. We always acquire the rf lock
+                // before the election timer lock in every other place. Here we
+                // must follow the same pattern to avoid deadlock: dropping the
+                // election timer lock then acquiring the two locks in order.
+                self.schedule_election(last_timer_count)
             });
 
             let mut guard = election.timer.lock();
@@ -215,13 +241,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
         log::info!("{:?} election timer daemon done.", self.me);
     }
 
-    fn run_election(
+    fn schedule_election(
         &self,
         timer_count: usize,
     ) -> Option<futures_channel::oneshot::Sender<()>> {
-        let me = self.me;
-        let (term, args) = {
-            let mut rf = self.inner_state.lock();
+        let (term, last_log_index) = {
+            let rf = self.inner_state.lock();
 
             // The previous election is faster and reached the critical section
             // before us. We should stop and not run this election.
@@ -230,51 +255,86 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 return None;
             }
 
+            (rf.current_term, rf.log.last_index_term())
+        };
+
+        let (tx, rx) = futures_channel::oneshot::channel();
+        let candidate = ElectionCandidate {
+            me: self.me,
+            peers: self.peers.clone(),
+            rf: self.inner_state.clone(),
+            election: self.election.clone(),
+            new_log_entry: self.sync_log_entries_comms.clone(),
+            verify_authority_daemon: self.verify_authority_daemon.clone(),
+            persister: self.persister.clone(),
+            thread_pool: self.thread_pool.clone(),
+        };
+        self.thread_pool.spawn(Self::run_election(
+            term,
+            candidate,
+            last_log_index,
+            rx,
+        ));
+        Some(tx)
+    }
+
+    async fn run_election(
+        term: Term,
+        candidate: ElectionCandidate<Command>,
+        last_index_term: IndexTerm,
+        cancel_token: futures_channel::oneshot::Receiver<()>,
+    ) {
+        let me = candidate.me;
+        let prevote_args = RequestVoteArgs {
+            term,
+            candidate_id: me,
+            last_log_index: last_index_term.index,
+            last_log_term: last_index_term.term,
+            prevote: true,
+        };
+
+        // Run the prevote phase first.
+        let prevotes = Self::spawn_request_votes(&candidate, prevote_args);
+        let prevote_results =
+            Self::quorum_before_cancelled(prevotes, cancel_token).await;
+        let cancel_token = match prevote_results {
+            QuorumOrCancelled::Accepted(cancel_token) => cancel_token,
+            // Did not get quorum on a prevote. Skip the rest.
+            _ => return,
+        };
+
+        // Advance to the next term.
+        let (next_term, (last_log_index, last_log_term)) = {
+            let mut rf = candidate.rf.lock();
+
+            if term != rf.current_term {
+                return;
+            }
+
             rf.current_term.0 += 1;
 
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
 
-            self.persister.save_state(rf.persisted_state().into());
+            candidate.persister.save_state(rf.persisted_state().into());
 
-            let term = rf.current_term;
-            let (last_log_index, last_log_term) =
-                rf.log.last_index_term().unpack();
+            (rf.current_term, rf.log.last_index_term().unpack())
+        };
 
-            (
-                term,
-                RequestVoteArgs {
-                    term,
-                    candidate_id: me,
-                    last_log_index,
-                    last_log_term,
-                },
-            )
+        let args = RequestVoteArgs {
+            term: next_term,
+            candidate_id: me,
+            last_log_index,
+            last_log_term,
+            prevote: false,
         };
 
-        let mut votes = vec![];
-        for peer in self.peers.clone().into_iter() {
-            if peer != self.me {
-                let one_vote = self
-                    .thread_pool
-                    .spawn(Self::request_vote(peer, args.clone()));
-                votes.push(one_vote);
-            }
+        let votes = Self::spawn_request_votes(&candidate, args);
+        if let QuorumOrCancelled::Accepted(_) =
+            Self::quorum_before_cancelled(votes, cancel_token).await
+        {
+            Self::become_leader(next_term, candidate);
         }
-
-        let (tx, rx) = futures_channel::oneshot::channel();
-        self.thread_pool.spawn(Self::count_vote_util_cancelled(
-            me,
-            term,
-            self.inner_state.clone(),
-            votes,
-            rx,
-            self.election.clone(),
-            self.sync_log_entries_comms.clone(),
-            self.verify_authority_daemon.clone(),
-            self.persister.clone(),
-        ));
-        Some(tx)
     }
 
     const REQUEST_VOTE_RETRY: usize = 1;
@@ -294,18 +354,26 @@ impl<Command: ReplicableCommand> Raft<Command> {
         None
     }
 
-    #[allow(clippy::too_many_arguments)]
-    async fn count_vote_util_cancelled(
-        me: Peer,
-        term: Term,
-        rf: Arc<Mutex<RaftState<Command>>>,
+    fn spawn_request_votes(
+        candidate: &ElectionCandidate<Command>,
+        args: RequestVoteArgs,
+    ) -> Vec<tokio::task::JoinHandle<Option<bool>>> {
+        let mut votes = vec![];
+        for peer in candidate.peers.clone().into_iter() {
+            if peer != candidate.me {
+                let one_vote = candidate
+                    .thread_pool
+                    .spawn(Self::request_vote(peer, args.clone()));
+                votes.push(one_vote);
+            }
+        }
+        return votes;
+    }
+
+    async fn quorum_before_cancelled(
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
-        election: Arc<ElectionState>,
-        new_log_entry: SyncLogEntriesComms,
-        verify_authority_daemon: VerifyAuthorityDaemon,
-        persister: Arc<dyn Persister>,
-    ) {
+    ) -> QuorumOrCancelled {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;
         let mut against_count = 0;
@@ -322,7 +390,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
             )
             .await;
             let ((one_vote, _, rest), new_token) = match selected {
-                futures_util::future::Either::Left(_) => break,
+                futures_util::future::Either::Left(_) => {
+                    return QuorumOrCancelled::Cancelled
+                }
                 futures_util::future::Either::Right(tuple) => tuple,
             };
 
@@ -338,13 +408,20 @@ impl<Command: ReplicableCommand> Raft<Command> {
             }
         }
 
-        if vote_count < quorum {
-            return;
-        }
-        let mut rf = rf.lock();
+        return if vote_count < quorum {
+            QuorumOrCancelled::Rejected
+        } else {
+            QuorumOrCancelled::Accepted(cancel_token)
+        };
+    }
+
+    fn become_leader(term: Term, this: ElectionCandidate<Command>) {
+        let me = this.me;
+        log::info!("{me:?} voted as leader at term {term:?}");
+        let mut rf = this.rf.lock();
         if rf.current_term == term && rf.state == State::Candidate {
             // We are the leader now. The election timer can be stopped.
-            election.stop_election_timer();
+            this.election.stop_election_timer();
 
             rf.state = State::Leader;
             rf.leader_id = me;
@@ -354,7 +431,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             if rf.commit_index != rf.log.last_index_term().index {
                 // Create a sentinel commit at the start of the term.
                 sentinel_commit_index = rf.log.add_term_change_entry(term);
-                persister.save_state(rf.persisted_state().into());
+                this.persister.save_state(rf.persisted_state().into());
             } else {
                 sentinel_commit_index = rf.commit_index;
             }
@@ -364,10 +441,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
             // authority.
             // No verity authority request can go through before the reset is
             // done, since we are holding the raft lock.
-            verify_authority_daemon.reset_state(term, sentinel_commit_index);
+            this.verify_authority_daemon
+                .reset_state(term, sentinel_commit_index);
 
             // Sync all logs now.
-            new_log_entry.reset_progress(term, sentinel_commit_index);
+            this.new_log_entry
+                .reset_progress(term, sentinel_commit_index);
         }
     }
 }

+ 1 - 0
src/messages.rs

@@ -10,6 +10,7 @@ pub struct RequestVoteArgs {
     pub(crate) candidate_id: Peer,
     pub(crate) last_log_index: Index,
     pub(crate) last_log_term: Term,
+    pub(crate) prevote: bool,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]

+ 12 - 0
src/process_request_vote.rs

@@ -14,6 +14,18 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
         let mut rf = self.inner_state.lock();
 
         let term = rf.current_term;
+
+        if args.prevote {
+            let last_log = rf.log.last_index_term();
+            let longer_log = args.last_log_term > last_log.term
+                || (args.last_log_term == last_log.term
+                    && args.last_log_index >= last_log.index);
+            return RequestVoteReply {
+                term: args.term,
+                vote_granted: args.term >= term && longer_log,
+            };
+        }
+
         #[allow(clippy::comparison_chain)]
         if args.term < term {
             return RequestVoteReply {

+ 1 - 0
src/utils/integration_test.rs

@@ -16,6 +16,7 @@ pub fn make_request_vote_args(
         candidate_id: Peer(peer_id),
         last_log_index,
         last_log_term,
+        prevote: false,
     }
 }