Procházet zdrojové kódy

Make run_election_timer() a simple function.

Jing Yang před 3 roky
rodič
revize
10ac3235eb
2 změnil soubory, kde provedl 83 přidání a 86 odebrání
  1. 79 84
      src/election.rs
  2. 4 2
      src/raft.rs

+ 79 - 84
src/election.rs

@@ -126,93 +126,88 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// election timer. There could be more than one vote-counting tasks running
     /// at the same time, but all earlier tasks except the newest one will
     /// eventually realize the term they were competing for has passed and quit.
-    pub(crate) fn run_election_timer(&self) -> impl FnOnce() {
-        let this = self.clone();
-
-        move || {
-            log::info!("{:?} election timer daemon running ...", this.me);
-
-            let election = this.election.clone();
-
-            let mut should_run = None;
-            while this.keep_running.load(Ordering::Relaxed) {
-                let mut cancel_handle =
-                    should_run.and_then(|last_timer_count| {
-                        this.run_election(last_timer_count)
-                    });
-
-                let mut guard = election.timer.lock();
-                let (timer_count, deadline) = (guard.version, guard.deadline);
-                // If the timer is reset
-                // 0. Zero times. We know should_run is None. If should_run has
-                // a value, the election would have been started and the timer
-                // reset by the election. That means the timer did not fire in
-                // the last iteration. We should just wait.
-                // 1. One time. We know that the timer is either reset by the
-                // election or by someone else before the election, in which
-                // case the election was never started. We should just wait.
-                // 2. More than one time. We know that the timer is first reset
-                // by the election, and then reset by someone else, in that
-                // order. We should cancel the election and just wait.
-                if let Some(last_timer_count) = should_run {
-                    let expected_timer_count = last_timer_count + 1;
-                    assert!(timer_count >= expected_timer_count);
-                    // If the timer was changed more than once, we know the
-                    // last scheduled election should have been cancelled.
-                    if timer_count > expected_timer_count {
-                        cancel_handle.take().map(|c| c.send(()));
-                    }
-                }
-                // check the running signal before sleeping. We are holding the
-                // timer lock, so no one can change it. The kill() method will
-                // not be able to notify this thread before `wait` is called.
-                if !this.keep_running.load(Ordering::Relaxed) {
-                    break;
+    pub(crate) fn run_election_timer(&self) {
+        log::info!("{:?} election timer daemon running ...", self.me);
+
+        let election = self.election.clone();
+
+        let mut should_run = None;
+        while self.keep_running.load(Ordering::Relaxed) {
+            let mut cancel_handle = should_run.and_then(|last_timer_count| {
+                self.run_election(last_timer_count)
+            });
+
+            let mut guard = election.timer.lock();
+            let (timer_count, deadline) = (guard.version, guard.deadline);
+            // If the timer is reset
+            // 0. Zero times. We know should_run is None. If should_run has
+            // a value, the election would have been started and the timer
+            // reset by the election. That means the timer did not fire in
+            // the last iteration. We should just wait.
+            // 1. One time. We know that the timer is either reset by the
+            // election or by someone else before the election, in which
+            // case the election was never started. We should just wait.
+            // 2. More than one time. We know that the timer is first reset
+            // by the election, and then reset by someone else, in that
+            // order. We should cancel the election and just wait.
+            if let Some(last_timer_count) = should_run {
+                let expected_timer_count = last_timer_count + 1;
+                assert!(timer_count >= expected_timer_count);
+                // If the timer was changed more than once, we know the
+                // last scheduled election should have been cancelled.
+                if timer_count > expected_timer_count {
+                    cancel_handle.take().map(|c| c.send(()));
                 }
-                should_run = match deadline {
-                    Some(timeout) => loop {
-                        let ret =
-                            election.signal.wait_until(&mut guard, timeout);
-                        let fired = ret.timed_out() && Instant::now() > timeout;
-                        // If the timer has been updated, do not schedule,
-                        // break so that we could cancel.
-                        if timer_count != guard.version {
-                            // Timer has been updated, cancel current
-                            // election, and block on timeout again.
-                            break None;
-                        } else if fired {
-                            // Timer has fired, remove the timer and allow
-                            // running the next election at timer_count.
-                            // If the next election is cancelled before we
-                            // are back on wait, timer_count will be set to
-                            // a different value.
-                            guard.version += 1;
-                            guard.deadline.take();
-                            break Some(guard.version);
-                        }
-                    },
-                    None => {
-                        election.signal.wait(&mut guard);
-                        // The timeout has changed, check again.
-                        None
-                    }
-                };
-                drop(guard);
-                // Whenever woken up, cancel the current running election.
-                // There are 3 cases we could reach here
-                // 1. We received an AppendEntries, or decided to vote for
-                // a peer, and thus turned into a follower. In this case we'll
-                // be notified by the election signal.
-                // 2. We are a follower but didn't receive a heartbeat on time,
-                // or we are a candidate but didn't not collect enough vote on
-                // time. In this case we'll have a timeout.
-                // 3. When become a leader, or are shutdown. In this case we'll
-                // be notified by the election signal.
-                cancel_handle.map(|c| c.send(()));
             }
-
-            log::info!("{:?} election timer daemon done.", this.me);
+            // check the running signal before sleeping. We are holding the
+            // timer lock, so no one can change it. The kill() method will
+            // not be able to notify this thread before `wait` is called.
+            if !self.keep_running.load(Ordering::Relaxed) {
+                break;
+            }
+            should_run = match deadline {
+                Some(timeout) => loop {
+                    let ret = election.signal.wait_until(&mut guard, timeout);
+                    let fired = ret.timed_out() && Instant::now() > timeout;
+                    // If the timer has been updated, do not schedule,
+                    // break so that we could cancel.
+                    if timer_count != guard.version {
+                        // Timer has been updated, cancel current
+                        // election, and block on timeout again.
+                        break None;
+                    } else if fired {
+                        // Timer has fired, remove the timer and allow
+                        // running the next election at timer_count.
+                        // If the next election is cancelled before we
+                        // are back on wait, timer_count will be set to
+                        // a different value.
+                        guard.version += 1;
+                        guard.deadline.take();
+                        break Some(guard.version);
+                    }
+                    // Alarm has not fired yet. Continue to wait.
+                },
+                None => {
+                    election.signal.wait(&mut guard);
+                    // The timeout has changed, check again.
+                    None
+                }
+            };
+            drop(guard);
+            // Whenever woken up, cancel the current running election.
+            // There are 3 cases we could reach here
+            // 1. We received an AppendEntries, or decided to vote for
+            // a peer, and thus turned into a follower. In this case we'll
+            // be notified by the election signal.
+            // 2. We are a follower but didn't receive a heartbeat on time,
+            // or we are a candidate but didn't not collect enough vote on
+            // time. In this case we'll have a timeout.
+            // 3. When become a leader, or are shutdown. In this case we'll
+            // be notified by the election signal.
+            cancel_handle.map(|c| c.send(()));
         }
+
+        log::info!("{:?} election timer daemon done.", self.me);
     }
 
     fn run_election(

+ 4 - 2
src/raft.rs

@@ -177,8 +177,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // internal thread pool.
         this.schedule_heartbeats(HEARTBEAT_INTERVAL);
         // The last step is to start running election timer.
-        let election_timer = this.run_election_timer();
-        daemon_watch.create_daemon(Daemon::ElectionTimer, election_timer);
+        daemon_watch.create_daemon(Daemon::ElectionTimer, {
+            let raft = this.clone();
+            move || raft.run_election_timer()
+        });
 
         // Create the join handle
         this.join_handle.lock().replace(RaftJoinHandle {