Forráskód Böngészése

Create a dedicated struct for becoming the next leader.

We cannot clone the whole raft struct because:

1. A free standing raft struct blocks shutdown.
2. Election cancellation is not guaranteed.
Jing Yang 3 éve
szülő
commit
18cbd8e0c6
1 módosított fájl, 38 hozzáadás és 10 törlés
  1. 38 10
      src/election.rs

+ 38 - 10
src/election.rs

@@ -1,12 +1,18 @@
 use std::sync::atomic::Ordering;
+use std::sync::Arc;
 use std::time::{Duration, Instant};
 
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::remote_context::RemoteContext;
+use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
-use crate::{Peer, Raft, ReplicableCommand, RequestVoteArgs, State, Term};
+use crate::verify_authority::VerifyAuthorityDaemon;
+use crate::{
+    Peer, Persister, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
+    State, Term,
+};
 
 struct VersionedDeadline {
     version: usize,
@@ -67,6 +73,18 @@ impl ElectionState {
     }
 }
 
+/// A helper struct that stores all the information needed
+/// to become the leader.
+struct ElectionCandidate<Command> {
+    me: Peer,
+    term: Term,
+    rf: Arc<Mutex<RaftState<Command>>>,
+    election: Arc<ElectionState>,
+    new_log_entry: SyncLogEntriesComms,
+    verify_authority_daemon: VerifyAuthorityDaemon,
+    persister: Arc<dyn Persister>,
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -248,13 +266,22 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
         let votes = self.spawn_request_votes(args);
         let (tx, rx) = futures_channel::oneshot::channel();
-        let this = self.clone();
+        let candidate = ElectionCandidate {
+            me: self.me,
+            term,
+            rf: self.inner_state.clone(),
+            election: self.election.clone(),
+            new_log_entry: self.sync_log_entries_comms.clone(),
+            verify_authority_daemon: self.verify_authority_daemon.clone(),
+            persister: self.persister.clone(),
+        };
+
         self.thread_pool.spawn(async move {
             if !Self::quorum_before_cancelled(votes, rx).await {
                 return;
             }
 
-            this.become_leader(term);
+            Self::become_leader(candidate);
         });
         Some(tx)
     }
@@ -331,21 +358,22 @@ impl<Command: ReplicableCommand> Raft<Command> {
         return vote_count >= quorum;
     }
 
-    fn become_leader(&self, term: Term) {
-        let mut rf = self.inner_state.lock();
+    fn become_leader(this: ElectionCandidate<Command>) {
+        let term = this.term;
+        let mut rf = this.rf.lock();
         if rf.current_term == term && rf.state == State::Candidate {
             // We are the leader now. The election timer can be stopped.
-            self.election.stop_election_timer();
+            this.election.stop_election_timer();
 
             rf.state = State::Leader;
-            rf.leader_id = self.me;
+            rf.leader_id = this.me;
             rf.match_index.fill(0);
 
             let sentinel_commit_index;
             if rf.commit_index != rf.log.last_index_term().index {
                 // Create a sentinel commit at the start of the term.
                 sentinel_commit_index = rf.log.add_term_change_entry(term);
-                self.persister.save_state(rf.persisted_state().into());
+                this.persister.save_state(rf.persisted_state().into());
             } else {
                 sentinel_commit_index = rf.commit_index;
             }
@@ -355,11 +383,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
             // authority.
             // No verity authority request can go through before the reset is
             // done, since we are holding the raft lock.
-            self.verify_authority_daemon
+            this.verify_authority_daemon
                 .reset_state(term, sentinel_commit_index);
 
             // Sync all logs now.
-            self.sync_log_entries_comms
+            this.new_log_entry
                 .reset_progress(term, sentinel_commit_index);
         }
     }