Parcourir la source

Implement a timer version number for consistency.

Jing Yang il y a 5 ans
Parent
commit
18a98f3c27
1 fichiers modifiés avec 43 ajouts et 19 suppressions
  1. 43 19
      src/lib.rs

+ 43 - 19
src/lib.rs

@@ -64,7 +64,7 @@ struct RaftState {
 
 struct ElectionState {
     // Timer will be removed upon shutdown or elected.
-    timer: Mutex<Option<Instant>>,
+    timer: Mutex<(usize, Option<Instant>)>,
     // Wake up the timer thread when the timer is reset or cancelled.
     signal: Condvar,
 }
@@ -142,7 +142,7 @@ impl Raft {
         };
 
         let election = ElectionState {
-            timer: Mutex::new(None),
+            timer: Mutex::new((0, None)),
             signal: Condvar::new(),
         };
         election.reset_election_timer();
@@ -212,6 +212,9 @@ impl Raft {
         {
             rf.voted_for = Some(args.candidate_id);
 
+            // It is possible that we have set a timer above when updating the
+            // current term. It does not hurt to update the timer again.
+            // We do need to persist, though.
             self.election.reset_election_timer();
             rf.persist();
 
@@ -292,27 +295,32 @@ impl Raft {
         std::thread::spawn(move || {
             // Current election cancel handle, might be None if no election is running.
             let mut cancel_handle: Option<
-                futures::channel::oneshot::Sender<Term>,
+                futures::channel::oneshot::Sender<()>,
             > = None;
             let election = this.election.clone();
             while this.keep_running.load(Ordering::SeqCst) {
-                let (timed_out, timed_out_at) = {
+                let (timed_out, timer_count, timed_out_at) = {
                     let mut guard = election.timer.lock();
                     match guard.clone() {
-                        Some(timeout) => loop {
+                        (timer_count, Some(timeout)) => loop {
                             let ret =
                                 election.signal.wait_until(&mut guard, timeout);
                             let woken = !ret.timed_out();
-                            let updated = *guard != Some(timeout);
+                            let updated =
+                                *guard != (timer_count, Some(timeout));
                             let fired = Instant::now() < timeout;
                             if woken || updated || fired {
-                                break (!woken && !updated, timeout);
+                                break (
+                                    !woken && !updated,
+                                    timer_count,
+                                    timeout,
+                                );
                             }
                         },
-                        None => {
+                        (timer_count, None) => {
                             election.signal.wait(&mut guard);
                             // The timeout has changed, check again.
-                            (false, Instant::now())
+                            (false, timer_count, Instant::now())
                         }
                     }
                 };
@@ -327,14 +335,14 @@ impl Raft {
                 // 3. When become a leader, or are shutdown. In this case we'll
                 // be notified by the election signal.
                 if let Some(cancel_handle) = cancel_handle.take() {
-                    let _ = cancel_handle.send(Term(0));
+                    let _ = cancel_handle.send(());
                 }
                 if timed_out && this.keep_running.load(Ordering::SeqCst) {
-                    cancel_handle = this.run_election();
+                    cancel_handle = this.run_election(timer_count);
                     // timeout must have been changed.
                     assert_ne!(
-                        Some(timed_out_at),
-                        election.timer.lock().clone()
+                        (timer_count, Some(timed_out_at)),
+                        election.timer.lock().clone(),
                     );
                 }
                 // Now we block on the timeout again.
@@ -342,14 +350,18 @@ impl Raft {
         });
     }
 
-    fn run_election(&self) -> Option<futures::channel::oneshot::Sender<Term>> {
+    fn run_election(
+        &self,
+        timer_count: usize,
+    ) -> Option<futures::channel::oneshot::Sender<()>> {
         let me = self.me;
         let (term, args) = {
             let mut rf = self.inner_state.lock();
 
             // The previous election is faster and reached the critical section
             // before us. We should stop and not run this election.
-            if rf.state == State::Leader {
+            // Or someone else increased the term and the timer is reset.
+            if !self.election.try_reset_election_timer(timer_count) {
                 return None;
             }
 
@@ -358,7 +370,6 @@ impl Raft {
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
 
-            self.election.reset_election_timer();
             rf.persist();
 
             let term = rf.current_term;
@@ -426,7 +437,7 @@ impl Raft {
         election: Arc<ElectionState>,
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         majority: usize,
-        cancel_token: futures::channel::oneshot::Receiver<Term>,
+        cancel_token: futures::channel::oneshot::Receiver<()>,
         new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
     ) {
         let mut vote_count = 0;
@@ -740,8 +751,20 @@ const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
 impl ElectionState {
     fn reset_election_timer(&self) {
         let mut guard = self.timer.lock();
-        guard.replace(Instant::now() + Self::election_timeout());
+        guard.0 += 1;
+        guard.1.replace(Instant::now() + Self::election_timeout());
+        self.signal.notify_one();
+    }
+
+    fn try_reset_election_timer(&self, timer_count: usize) -> bool {
+        let mut guard = self.timer.lock();
+        if guard.0 != timer_count {
+            return false;
+        }
+        guard.0 += 1;
+        guard.1.replace(Instant::now() + Self::election_timeout());
         self.signal.notify_one();
+        true
     }
 
     fn election_timeout() -> Duration {
@@ -753,7 +776,8 @@ impl ElectionState {
 
     fn stop_election_timer(&self) {
         let mut guard = self.timer.lock();
-        guard.take();
+        guard.0 += 1;
+        guard.1.take();
         self.signal.notify_one();
     }
 }