Sfoglia il codice sorgente

Refuse prevote if regularly receiving heartbeats. #10

The disruptive_liveness test will fail. This is expected.
Jing Yang 3 anni fa
parent
commit
22b3c73eaf
3 ha cambiato i file con 40 aggiunte e 7 eliminazioni
  1. 1 1
      src/election.rs
  2. 19 2
      src/process_request_vote.rs
  3. 20 4
      src/raft_state.rs

+ 1 - 1
src/election.rs

@@ -26,7 +26,7 @@ pub(crate) struct ElectionState {
     signal: Condvar,
 }
 
-const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 200;
+pub(crate) const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 200;
 const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 200;
 impl ElectionState {
     pub(crate) fn create() -> Self {

+ 19 - 2
src/process_request_vote.rs

@@ -1,4 +1,9 @@
-use crate::{Raft, RequestVoteArgs, RequestVoteReply};
+use std::time::{Duration, Instant};
+
+use crate::{
+    election::ELECTION_TIMEOUT_BASE_MILLIS, Raft, RequestVoteArgs,
+    RequestVoteReply,
+};
 
 // Command must be
 // 1. clone: they are copied to the persister.
@@ -17,12 +22,13 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
 
         if args.prevote {
             let last_log = rf.log.last_index_term();
+            let timed_out = Self::heartbeat_timed_out(rf.last_heartbeat());
             let longer_log = args.last_log_term > last_log.term
                 || (args.last_log_term == last_log.term
                     && args.last_log_index >= last_log.index);
             return RequestVoteReply {
                 term: args.term,
-                vote_granted: args.term >= term && longer_log,
+                vote_granted: args.term >= term && longer_log && timed_out,
             };
         }
 
@@ -66,4 +72,15 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
             }
         }
     }
+
+    fn heartbeat_timed_out(last_heartbeat: Option<Instant>) -> bool {
+        let Some(last_heartbeat) = last_heartbeat else {
+            return true;
+        };
+
+        return last_heartbeat
+            .checked_add(Duration::from_millis(ELECTION_TIMEOUT_BASE_MILLIS))
+            .unwrap()
+            .le(&Instant::now());
+    }
 }

+ 20 - 4
src/raft_state.rs

@@ -1,15 +1,22 @@
+use std::time::Instant;
+
 use crate::{
     log_array::LogArray, persister::PersistedRaftState, Index, Peer, Term,
 };
 
 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
 pub(crate) enum State {
-    Follower,
+    Follower(FollowerData),
     Prevote,
     Candidate,
     Leader,
 }
 
+#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
+pub(crate) struct FollowerData {
+    last_heartbeat: Option<Instant>,
+}
+
 #[repr(align(64))]
 pub(crate) struct RaftState<Command> {
     pub current_term: Term,
@@ -33,19 +40,28 @@ impl<Command> RaftState<Command> {
             log: LogArray::create(),
             commit_index: 0,
             match_index: vec![0; peer_size],
-            state: State::Follower,
+            state: State::Follower(FollowerData::default()),
             leader_id: me,
         }
     }
 
     pub fn step_down(&mut self) {
         self.voted_for = None;
-        self.state = State::Follower;
+        self.state = State::Follower(FollowerData::default());
     }
 
     pub fn meet_leader(&mut self, leader_id: Peer) {
         self.leader_id = leader_id;
-        self.state = State::Follower;
+        self.state = State::Follower(FollowerData {
+            last_heartbeat: Some(Instant::now()),
+        })
+    }
+
+    pub fn last_heartbeat(&self) -> Option<Instant> {
+        let State::Follower(follower_data) = &self.state else {
+            return None;
+        };
+        return follower_data.last_heartbeat;
     }
 }