Parcourir la source

Commit a new log entry immediately after being elected.

Jing Yang il y a 3 ans
Parent
commit
9d63f1d698
3 fichiers modifiés avec 34 ajouts et 7 suppressions
  1. 14 2
      src/election.rs
  2. 4 0
      src/raft_state.rs
  3. 16 5
      src/verify_authority.rs

+ 14 - 2
src/election.rs

@@ -9,7 +9,9 @@ use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
-use crate::{Peer, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term};
+use crate::{
+    Peer, Persister, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term,
+};
 
 #[derive(Default)]
 pub(crate) struct ElectionState {
@@ -70,7 +72,7 @@ impl ElectionState {
 // 3. serialize: they are converted to bytes to persist.
 impl<Command> Raft<Command>
 where
-    Command: 'static + Clone + Send + serde::Serialize,
+    Command: 'static + Clone + Default + Send + serde::Serialize,
 {
     /// Runs the election timer daemon that triggers elections.
     ///
@@ -279,6 +281,7 @@ where
             self.election.clone(),
             self.new_log_entry.clone().unwrap(),
             self.verify_authority_daemon.clone(),
+            self.persister.clone(),
         ));
         Some(tx)
     }
@@ -314,6 +317,7 @@ where
         election: Arc<ElectionState>,
         new_log_entry: SharedSender<Option<Peer>>,
         verify_authority_daemon: VerifyAuthorityDaemon,
+        persister: Arc<dyn Persister>,
     ) {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;
@@ -374,6 +378,14 @@ where
             // done, since we are holding the raft lock.
             verify_authority_daemon.reset_state(term);
 
+            if rf.commit_index != rf.log.last_index_term().index {
+                rf.sentinel_commit_index =
+                    rf.log.add_command(term, Default::default());
+                persister.save_state(rf.persisted_state().into());
+            } else {
+                rf.sentinel_commit_index = rf.commit_index;
+            }
+
             // Sync all logs now.
             let _ = new_log_entry.send(None);
         }

+ 4 - 0
src/raft_state.rs

@@ -25,6 +25,9 @@ pub(crate) struct RaftState<Command> {
     pub state: State,
 
     pub leader_id: Peer,
+
+    // Index of the first commit of each term as the leader.
+    pub sentinel_commit_index: Index,
 }
 
 impl<Command: Default> RaftState<Command> {
@@ -40,6 +43,7 @@ impl<Command: Default> RaftState<Command> {
             current_step: vec![0; peer_size],
             state: State::Follower,
             leader_id: me,
+            sentinel_commit_index: 0,
         }
     }
 }

+ 16 - 5
src/verify_authority.rs

@@ -152,6 +152,7 @@ impl VerifyAuthorityDaemon {
         &self,
         current_term: Term,
         commit_index: Index,
+        sentinel_commit_index: Index,
     ) {
         // Opportunistic check: do nothing if we don't have any requests.
         if self.state.lock().queue.is_empty() {
@@ -159,7 +160,14 @@ impl VerifyAuthorityDaemon {
         }
 
         self.clear_committed_requests(current_term, commit_index);
-        self.clear_ticked_requests();
+        // Do not use ticks to clear requests if we have not committed at least
+        // one log entry since the start of the term. At the start of the term,
+        // the leader might not know the commit index of the previous leader.
+        // This holds true even it is guaranteed that all entries committed by
+        // the previous leader will be committed by the current leader.
+        if commit_index >= sentinel_commit_index {
+            self.clear_ticked_requests();
+        }
         self.removed_expired_requests(current_term);
     }
 
@@ -296,12 +304,15 @@ impl<Command: 'static + Send> Raft<Command> {
         let join_handle = std::thread::spawn(move || {
             while keep_running.load(Ordering::Acquire) {
                 parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
-                let (current_term, commit_index) = {
+                let (current_term, commit_index, sentinel) = {
                     let rf = rf.lock();
-                    (rf.current_term, rf.commit_index)
+                    (rf.current_term, rf.commit_index, rf.sentinel_commit_index)
                 };
-                this_daemon
-                    .run_verify_authority_iteration(current_term, commit_index);
+                this_daemon.run_verify_authority_iteration(
+                    current_term,
+                    commit_index,
+                    sentinel,
+                );
             }
         });
         self.daemon_env