|
|
@@ -301,23 +301,21 @@ impl Raft {
|
|
|
while this.keep_running.load(Ordering::SeqCst) {
|
|
|
let (timed_out, timer_count, timed_out_at) = {
|
|
|
let mut guard = election.timer.lock();
|
|
|
- match guard.clone() {
|
|
|
- (timer_count, Some(timeout)) => loop {
|
|
|
+ let (timer_count, deadline) = *guard;
|
|
|
+ // TODO: cancel if timer_counter changed since last election
|
|
|
+ // was scheduled.
|
|
|
+ match deadline {
|
|
|
+ Some(timeout) => loop {
|
|
|
let ret =
|
|
|
election.signal.wait_until(&mut guard, timeout);
|
|
|
- let woken = !ret.timed_out();
|
|
|
- let updated =
|
|
|
- *guard != (timer_count, Some(timeout));
|
|
|
- let fired = Instant::now() < timeout;
|
|
|
- if woken || updated || fired {
|
|
|
- break (
|
|
|
- !woken && !updated,
|
|
|
- timer_count,
|
|
|
- timeout,
|
|
|
- );
|
|
|
+ let updated = timer_count != guard.0;
|
|
|
+ let fired =
|
|
|
+ ret.timed_out() && Instant::now() < timeout;
|
|
|
+ if updated || fired {
|
|
|
+ break (!updated, timer_count, timeout);
|
|
|
}
|
|
|
},
|
|
|
- (timer_count, None) => {
|
|
|
+ None => {
|
|
|
election.signal.wait(&mut guard);
|
|
|
// The timeout has changed, check again.
|
|
|
(false, timer_count, Instant::now())
|
|
|
@@ -334,9 +332,8 @@ impl Raft {
|
|
|
// time. In this case we'll have a timeout.
|
|
|
// 3. When become a leader, or are shutdown. In this case we'll
|
|
|
// be notified by the election signal.
|
|
|
- if let Some(cancel_handle) = cancel_handle.take() {
|
|
|
- let _ = cancel_handle.send(());
|
|
|
- }
|
|
|
+ cancel_handle.take().map(|c| c.send(()));
|
|
|
+
|
|
|
if timed_out && this.keep_running.load(Ordering::SeqCst) {
|
|
|
cancel_handle = this.run_election(timer_count);
|
|
|
// timeout must have been changed.
|