process_request_vote.rs 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use std::time::{Duration, Instant};
  2. use crate::{
  3. election::ELECTION_TIMEOUT_BASE_MILLIS, Raft, RequestVoteArgs,
  4. RequestVoteReply,
  5. };
  6. // Command must be
  7. // 1. clone: they are copied to the persister.
  8. // 2. serialize: they are converted to bytes to persist.
  9. impl<Command: Clone + serde::Serialize> Raft<Command> {
  10. pub fn process_request_vote(
  11. &self,
  12. args: RequestVoteArgs,
  13. ) -> RequestVoteReply {
  14. // Note: do not change this to `let _ = ...`.
  15. let _guard = self.daemon_env.for_scope();
  16. let mut rf = self.inner_state.lock();
  17. let term = rf.current_term;
  18. if args.prevote {
  19. let last_log = rf.log.last_index_term();
  20. let timed_out = Self::heartbeat_timed_out(rf.last_heartbeat());
  21. let longer_log = args.last_log_term > last_log.term
  22. || (args.last_log_term == last_log.term
  23. && args.last_log_index >= last_log.index);
  24. return RequestVoteReply {
  25. term: args.term,
  26. vote_granted: args.term >= term && longer_log && timed_out,
  27. };
  28. }
  29. #[allow(clippy::comparison_chain)]
  30. if args.term < term {
  31. return RequestVoteReply {
  32. term,
  33. vote_granted: false,
  34. };
  35. } else if args.term > term {
  36. rf.current_term = args.term;
  37. rf.step_down();
  38. self.election.reset_election_timer();
  39. self.persister.save_state(rf.persisted_state().into());
  40. }
  41. let voted_for = rf.voted_for;
  42. let last_log = rf.log.last_index_term();
  43. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  44. && (args.last_log_term > last_log.term
  45. || (args.last_log_term == last_log.term
  46. && args.last_log_index >= last_log.index))
  47. {
  48. rf.voted_for = Some(args.candidate_id);
  49. // It is possible that we have set a timer above when updating the
  50. // current term. It does not hurt to update the timer again.
  51. // We do need to persist, though.
  52. self.election.reset_election_timer();
  53. self.persister.save_state(rf.persisted_state().into());
  54. RequestVoteReply {
  55. term: args.term,
  56. vote_granted: true,
  57. }
  58. } else {
  59. RequestVoteReply {
  60. term: args.term,
  61. vote_granted: false,
  62. }
  63. }
  64. }
  65. fn heartbeat_timed_out(last_heartbeat: Option<Instant>) -> bool {
  66. let Some(last_heartbeat) = last_heartbeat else {
  67. return true;
  68. };
  69. return last_heartbeat
  70. .checked_add(Duration::from_millis(ELECTION_TIMEOUT_BASE_MILLIS))
  71. .unwrap()
  72. .le(&Instant::now());
  73. }
  74. }