check_quorum.rs 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. use std::sync::atomic::Ordering;
  2. use std::time::Duration;
  3. use crate::{Raft, ReplicableCommand};
  4. impl<Command: ReplicableCommand> Raft<Command> {
  5. pub fn schedule_check_quorum(&self, interval: Duration) {
  6. let me = self.me;
  7. let keep_running = self.keep_running.clone();
  8. let rf = self.inner_state.clone();
  9. let election = self.election.clone();
  10. let persister = self.persister.clone();
  11. let verify_authority_daemon = self.verify_authority_daemon.clone();
  12. let heartbeats_daemon = self.heartbeats_daemon.clone();
  13. self.thread_pool.spawn(async move {
  14. let mut interval = tokio::time::interval(interval);
  15. while keep_running.load(Ordering::Relaxed) {
  16. let (is_leader, term) = {
  17. let rf = rf.lock();
  18. (rf.is_leader(), rf.current_term)
  19. };
  20. if !is_leader {
  21. // Skip the rest of the loop if we are not the leader.
  22. interval.tick().await;
  23. continue;
  24. }
  25. // Technically we shouldn't get beats if we are not the leader,
  26. // but it does not hurt since we acquired the soft term lock.
  27. let beats_moment = verify_authority_daemon.beats_moment();
  28. heartbeats_daemon.trigger(false);
  29. interval.tick().await;
  30. // If we had authority in the past, that means we have not lost
  31. // contact with followers. Keep going.
  32. if verify_authority_daemon.verify_beats_moment(beats_moment) {
  33. continue;
  34. }
  35. let mut rf = rf.lock();
  36. // Only step down if we are still the leader at the same term.
  37. if rf.is_leader() && rf.current_term == term {
  38. log::warn!("Leader {me:?} lost quorum, stepping down.");
  39. rf.step_down();
  40. election.reset_election_timer();
  41. persister.save_state(rf.persisted_state().into());
  42. }
  43. }
  44. });
  45. }
  46. }