Kaynağa Gözat

More refactor and more comments.

Jing Yang 5 yıl önce
ebeveyn
işleme
e10b6ce01f
1 değiştirilmiş dosya ile 22 ekleme ve 20 silme
  1. 22 20
      src/lib.rs

+ 22 - 20
src/lib.rs

@@ -293,13 +293,15 @@ impl Raft {
     fn run_election_timer(&self) {
         let this = self.clone();
         std::thread::spawn(move || {
-            // Current election cancel handle, might be None if no election is running.
-            let mut cancel_handle: Option<
-                futures::channel::oneshot::Sender<()>,
-            > = None;
             let election = this.election.clone();
+
+            let mut should_run = None;
             while this.keep_running.load(Ordering::SeqCst) {
-                let (timed_out, timer_count, timed_out_at) = {
+                let cancel_handle = should_run
+                    .map(|last_timer_count| this.run_election(last_timer_count))
+                    .flatten();
+
+                should_run = {
                     let mut guard = election.timer.lock();
                     let (timer_count, deadline) = *guard;
                     // TODO: cancel if timer_counter changed since last election
@@ -308,17 +310,27 @@ impl Raft {
                         Some(timeout) => loop {
                             let ret =
                                 election.signal.wait_until(&mut guard, timeout);
-                            let updated = timer_count != guard.0;
                             let fired =
                                 ret.timed_out() && Instant::now() < timeout;
-                            if updated || fired {
-                                break (!updated, timer_count, timeout);
+                            // If the timer has been updated, do not schedule,
+                            // break so that we could cancel.
+                            if timer_count != guard.0 {
+                                // Timer has been updated, cancel current
+                                // election, and block on timeout again.
+                                break None;
+                            } else if fired {
+                                // Timer has fired, remove the timer and allow
+                                // running the next election at timer_count.
+                                // If the next election is cancelled before we
+                                // are back on wait, timer_count will be set to
+                                // a different value.
+                                break Some(timer_count);
                             }
                         },
                         None => {
                             election.signal.wait(&mut guard);
                             // The timeout has changed, check again.
-                            (false, timer_count, Instant::now())
+                            None
                         }
                     }
                 };
@@ -332,17 +344,7 @@ impl Raft {
                 // time. In this case we'll have a timeout.
                 // 3. When become a leader, or are shutdown. In this case we'll
                 // be notified by the election signal.
-                cancel_handle.take().map(|c| c.send(()));
-
-                if timed_out && this.keep_running.load(Ordering::SeqCst) {
-                    cancel_handle = this.run_election(timer_count);
-                    // timeout must have been changed.
-                    assert_ne!(
-                        (timer_count, Some(timed_out_at)),
-                        election.timer.lock().clone(),
-                    );
-                }
-                // Now we block on the timeout again.
+                cancel_handle.map(|c| c.send(()));
             }
         });
     }