Переглянути джерело

Implement an election timer.

Jing Yang 5 роки тому
батько
коміт
69653dc5c8
1 змінених файлів з 120 додано та 57 видалено
  1. 120 57
      src/lib.rs

+ 120 - 57
src/lib.rs

@@ -8,7 +8,7 @@ extern crate tokio;
 
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
@@ -60,11 +60,13 @@ struct RaftState {
     state: State,
 
     leader_id: Peer,
+}
 
-    // Current election cancel token, might be None if no election is running.
-    election_cancel_token: Option<futures::channel::oneshot::Sender<Term>>,
+struct ElectionState {
     // Timer will be removed upon shutdown or elected.
-    election_timer: Option<tokio::time::Delay>,
+    timer: Mutex<Option<Instant>>,
+    // Wake up the timer thread when the timer is reset or cancelled.
+    signal: Condvar,
 }
 
 #[derive(Clone)]
@@ -77,6 +79,7 @@ pub struct Raft {
     new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
+    election: Arc<ElectionState>,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 }
@@ -136,9 +139,14 @@ impl Raft {
             current_step: Vec::with_capacity(peer_size),
             state: State::Follower,
             leader_id: Peer(me),
-            election_cancel_token: None,
-            election_timer: None
         };
+
+        let election = ElectionState {
+            timer: Mutex::new(None),
+            signal: Condvar::new(),
+        };
+        election.reset_election_timer();
+
         let thread_pool = tokio::runtime::Builder::new()
             .threaded_scheduler()
             .enable_time()
@@ -154,10 +162,10 @@ impl Raft {
             new_log_entry: None,
             apply_command_signal: Arc::new(Default::default()),
             keep_running: Arc::new(Default::default()),
+            election: Arc::new(election),
             thread_pool: Arc::new(thread_pool),
         };
 
-        // TODO: election timer.
         // TODO: read persist.
         this.keep_running.store(true, Ordering::SeqCst);
         // Running in a standalone thread.
@@ -169,12 +177,8 @@ impl Raft {
         this.schedule_heartbeats(Duration::from_millis(
             HEARTBEAT_INTERVAL_MILLIS,
         ));
-
-        // The last step is to start running elections.
-        let election_handle = this.clone();
-        this.thread_pool.spawn(async move {
-            election_handle.run_election();
-        });
+        // The last step is to start running election timer.
+        this.run_election_timer();
         this
     }
 
@@ -194,8 +198,8 @@ impl Raft {
             rf.current_term = args.term;
             rf.voted_for = None;
             rf.state = State::Follower;
-            rf.reset_election_timer();
-            rf.stop_current_election();
+
+            self.election.reset_election_timer();
             rf.persist();
         }
 
@@ -207,8 +211,8 @@ impl Raft {
                     && args.last_log_index >= last_log_index))
         {
             rf.voted_for = Some(args.candidate_id);
-            rf.reset_election_timer();
-            // No need to stop the election. We are not a candidate.
+
+            self.election.reset_election_timer();
             rf.persist();
 
             RequestVoteReply {
@@ -243,10 +247,10 @@ impl Raft {
         }
 
         rf.state = State::Follower;
-        rf.reset_election_timer();
-        rf.stop_current_election();
         rf.leader_id = args.leader_id;
 
+        self.election.reset_election_timer();
+
         if rf.log.len() <= args.prev_log_index
             || rf.log[args.prev_log_index].term != args.term
         {
@@ -283,21 +287,78 @@ impl Raft {
         }
     }
 
-    fn run_election(&self) {
+    fn run_election_timer(&self) {
+        let this = self.clone();
+        std::thread::spawn(move || {
+            // Current election cancel handle, might be None if no election is running.
+            let mut cancel_handle: Option<
+                futures::channel::oneshot::Sender<Term>,
+            > = None;
+            let election = this.election.clone();
+            while this.keep_running.load(Ordering::SeqCst) {
+                let (timed_out, timed_out_at) = {
+                    let mut guard = election.timer.lock();
+                    match guard.clone() {
+                        Some(timeout) => loop {
+                            let ret =
+                                election.signal.wait_until(&mut guard, timeout);
+                            let woken = !ret.timed_out();
+                            let updated = *guard != Some(timeout);
+                            let fired = Instant::now() < timeout;
+                            if woken || updated || fired {
+                                break (!woken && !updated, timeout);
+                            }
+                        },
+                        None => {
+                            election.signal.wait(&mut guard);
+                            // The timeout has changed, check again.
+                            (false, Instant::now())
+                        }
+                    }
+                };
+                // Whenever woken up, cancel the current running election.
+                // There are 3 cases we could reach here
+                // 1. We received an AppendEntries, or decided to vote for
+                // a peer, and thus turned into a follower. In this case we'll
+                // be notified by the election signal.
+                // 2. We are a follower but didn't receive a heartbeat on time,
+                // or we are a candidate but didn't not collect enough vote on
+                // time. In this case we'll have a timeout.
+                // 3. When become a leader, or are shutdown. In this case we'll
+                // be notified by the election signal.
+                if let Some(cancel_handle) = cancel_handle.take() {
+                    let _ = cancel_handle.send(Term(0));
+                }
+                if timed_out && this.keep_running.load(Ordering::SeqCst) {
+                    cancel_handle = this.run_election();
+                    // timeout must have been changed.
+                    assert_ne!(
+                        Some(timed_out_at),
+                        election.timer.lock().clone()
+                    );
+                }
+                // Now we block on the timeout again.
+            }
+        });
+    }
+
+    fn run_election(&self) -> Option<futures::channel::oneshot::Sender<Term>> {
         let me = self.me;
-        let (term, args, cancel_token) = {
+        let (term, args) = {
             let mut rf = self.inner_state.lock();
 
-            let (tx, rx) = futures::channel::oneshot::channel();
+            // The previous election is faster and reached the critical section
+            // before us. We should stop and not run this election.
+            if rf.state == State::Leader {
+                return None;
+            }
+
             rf.current_term.0 += 1;
 
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
-            rf.reset_election_timer();
-            rf.stop_current_election();
-
-            rf.election_cancel_token.replace(tx);
 
+            self.election.reset_election_timer();
             rf.persist();
 
             let term = rf.current_term;
@@ -311,7 +372,6 @@ impl Raft {
                     last_log_index,
                     last_log_term,
                 },
-                rx,
             )
         };
 
@@ -331,14 +391,17 @@ impl Raft {
             }
         }
 
+        let (tx, rx) = futures::channel::oneshot::channel();
         self.thread_pool.spawn(Self::count_vote_util_cancelled(
             term,
             self.inner_state.clone(),
+            self.election.clone(),
             votes,
             self.peers.len() / 2,
-            cancel_token,
+            rx,
             self.new_log_entry.clone().unwrap(),
         ));
+        Some(tx)
     }
 
     const REQUEST_VOTE_RETRY: usize = 4;
@@ -360,6 +423,7 @@ impl Raft {
     async fn count_vote_util_cancelled(
         term: Term,
         rf: Arc<Mutex<RaftState>>,
+        election: Arc<ElectionState>,
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         majority: usize,
         cancel_token: futures::channel::oneshot::Receiver<Term>,
@@ -398,6 +462,9 @@ impl Raft {
         }
         let mut rf = rf.lock();
         if rf.current_term == term && rf.state == State::Candidate {
+            // We are the leader now. The election timer can be stopped.
+            election.stop_election_timer();
+
             rf.state = State::Leader;
             let log_len = rf.log.len();
             for item in rf.next_index.iter_mut() {
@@ -413,10 +480,6 @@ impl Raft {
             new_log_entry
                 .send(None)
                 .expect("Triggering log entry syncing should not fail");
-
-            // Drop the timer and cancel token.
-            rf.election_cancel_token.take();
-            rf.election_timer.take();
             rf.persist();
         }
     }
@@ -654,32 +717,7 @@ impl Raft {
     }
 }
 
-const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
-const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
-const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
-
 impl RaftState {
-    fn reset_election_timer(&mut self) {
-        self.election_timer.as_mut().map(|timer| {
-            timer.reset(
-                (std::time::Instant::now() + Self::election_timeout()).into(),
-            )
-        });
-    }
-
-    fn election_timeout() -> Duration {
-        Duration::from_millis(
-            ELECTION_TIMEOUT_BASE_MILLIS
-                + thread_rng().gen_range(0, ELECTION_TIMEOUT_VAR_MILLIS),
-        )
-    }
-
-    fn stop_current_election(&mut self) {
-        self.election_cancel_token
-            .take()
-            .map(|sender| sender.send(self.current_term));
-    }
-
     fn persist(&self) {
         // TODO: implement
     }
@@ -694,3 +732,28 @@ impl RaftState {
         (len - 1, self.log.last().unwrap().term)
     }
 }
+
+const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
+const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
+const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
+
+impl ElectionState {
+    fn reset_election_timer(&self) {
+        let mut guard = self.timer.lock();
+        guard.replace(Instant::now() + Self::election_timeout());
+        self.signal.notify_one();
+    }
+
+    fn election_timeout() -> Duration {
+        Duration::from_millis(
+            ELECTION_TIMEOUT_BASE_MILLIS
+                + thread_rng().gen_range(0, ELECTION_TIMEOUT_VAR_MILLIS),
+        )
+    }
+
+    fn stop_election_timer(&self) {
+        let mut guard = self.timer.lock();
+        guard.take();
+        self.signal.notify_one();
+    }
+}