Преглед на файлове

Move sentinel_commit_index to verify_authority_daemon.

It is not correct to put the value in RaftState. Imagine the following
scenario:
1. The daemon took a snapshot of last term's commit index and sentinel index.
Let's assume the sentinel has been committed.
2. We were elected as the leader of the new term but cannot commit the first
entry.
3. We received a verify authority request at the current commit index. Note
the commit index is inaccurate, since it is from another term.
4. The daemon went ahead and cleared ticked requests based on the stale
sentinel index. This is not correct.
Jing Yang преди 3 години
родител
ревизия
9523f609d3
променени са 3 файла, в които са добавени 93 реда и са изтрити 112 реда
  1. 11 8
      src/election.rs
  2. 0 4
      src/raft_state.rs
  3. 82 100
      src/verify_authority.rs

+ 11 - 8
src/election.rs

@@ -362,20 +362,23 @@ impl<Command: ReplicableCommand> Raft<Command> {
             for item in rf.current_step.iter_mut() {
                 *item = 0;
             }
-            // Reset the verify authority daemon before sending heartbeats to
-            // followers. This is critical to the correctness of verifying
-            // authority.
-            // No verity authority request can go through before the reset is
-            // done, since we are holding the raft lock.
-            verify_authority_daemon.reset_state(term);
 
+            let sentinel_commit_index;
             if rf.commit_index != rf.log.last_index_term().index {
-                rf.sentinel_commit_index = rf.log.add_term_change_entry(term);
+                // Create a sentinel commit at the start of the term.
+                sentinel_commit_index = rf.log.add_term_change_entry(term);
                 persister.save_state(rf.persisted_state().into());
             } else {
-                rf.sentinel_commit_index = rf.commit_index;
+                sentinel_commit_index = rf.commit_index;
             }
 
+            // Reset the verify authority daemon before sending heartbeats to
+            // followers. This is critical to the correctness of verifying
+            // authority.
+            // No verity authority request can go through before the reset is
+            // done, since we are holding the raft lock.
+            verify_authority_daemon.reset_state(term, sentinel_commit_index);
+
             // Sync all logs now.
             let _ = new_log_entry.send(None);
         }

+ 0 - 4
src/raft_state.rs

@@ -25,9 +25,6 @@ pub(crate) struct RaftState<Command> {
     pub state: State,
 
     pub leader_id: Peer,
-
-    // Index of the first commit of each term as the leader.
-    pub sentinel_commit_index: Index,
 }
 
 impl<Command> RaftState<Command> {
@@ -43,7 +40,6 @@ impl<Command> RaftState<Command> {
             current_step: vec![0; peer_size],
             state: State::Follower,
             leader_id: me,
-            sentinel_commit_index: 0,
         }
     }
 }

+ 82 - 100
src/verify_authority.rs

@@ -47,6 +47,8 @@ struct VerifyAuthorityState {
     /// corresponding peer.
     /// These indexes include all processed requests. They will never go down.
     covered: Vec<QueueIndex>,
+    /// The index of the first commit, created at the start of the term.
+    sentinel_commit_index: Index,
 }
 
 impl VerifyAuthorityState {
@@ -56,14 +58,16 @@ impl VerifyAuthorityState {
             queue: VecDeque::new(),
             start: QueueIndex(0),
             covered: vec![QueueIndex(0); peer_count],
+            sentinel_commit_index: 0,
         }
     }
 
-    pub fn reset(&mut self, term: Term) {
+    pub fn reset(&mut self, term: Term, sentinel_commit_index: Index) {
         self.clear_tickets();
 
         self.term = term;
         self.start = QueueIndex(0);
+        self.sentinel_commit_index = sentinel_commit_index;
         for item in self.covered.iter_mut() {
             *item = QueueIndex(0)
         }
@@ -118,8 +122,8 @@ impl VerifyAuthorityDaemon {
         self.condvar.wait_for(&mut guard, timeout);
     }
 
-    pub fn reset_state(&self, term: Term) {
-        self.state.lock().reset(term);
+    pub fn reset_state(&self, term: Term, sentinel_commit_index: Index) {
+        self.state.lock().reset(term, sentinel_commit_index);
         // Increase all beats by one to make sure upcoming verify authority
         // requests wait for beats in the current term. This in fact creates
         // phantom beats that will never be marked as completed by themselves.
@@ -178,7 +182,6 @@ impl VerifyAuthorityDaemon {
         &self,
         current_term: Term,
         commit_index: Index,
-        sentinel_commit_index: Index,
     ) {
         // Opportunistic check: do nothing if we don't have any requests.
         if self.state.lock().queue.is_empty() {
@@ -186,14 +189,7 @@ impl VerifyAuthorityDaemon {
         }
 
         self.clear_committed_requests(current_term, commit_index);
-        // Do not use ticks to clear requests if we have not committed at least
-        // one log entry since the start of the term. At the start of the term,
-        // the leader might not know the commit index of the previous leader.
-        // This holds true even it is guaranteed that all entries committed by
-        // the previous leader will be committed by the current leader.
-        if commit_index >= sentinel_commit_index {
-            self.clear_ticked_requests();
-        }
+        self.clear_ticked_requests(commit_index);
         self.remove_expired_requests(current_term);
     }
 
@@ -262,7 +258,16 @@ impl VerifyAuthorityDaemon {
     /// Fetches the newest successful RPC response from peers, and mark verify
     /// authority requests as complete if they are covered by more than half of
     /// the replicas.
-    fn clear_ticked_requests(&self) {
+    fn clear_ticked_requests(&self, commit_index: Index) {
+        // Do not use ticks to clear requests if we have not committed at least
+        // one log entry since the start of the term. At the start of the term,
+        // the leader might not know the commit index of the previous leader.
+        // This holds true even it is guaranteed that all entries committed by
+        // the previous leader will be committed by the current leader.
+        if commit_index < self.state.lock().sentinel_commit_index {
+            return;
+        }
+
         for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
             // Fetches the newest successful RPC response from the current peer.
             let ticked = beat_ticker.ticked();
@@ -363,7 +368,7 @@ impl VerifyAuthorityDaemon {
         // Fail all inflight verify authority requests. It is important to do
         // this so that the RPC framework could drop requests served by us and
         // release all references to the Raft instance.
-        self.reset_state(term);
+        self.reset_state(term, Index::MAX);
         self.condvar.notify_all();
     }
 }
@@ -382,15 +387,12 @@ impl<Command: 'static + Send> Raft<Command> {
             log::info!("{:?} verify authority daemon running ...", me);
             while keep_running.load(Ordering::Acquire) {
                 this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
-                let (current_term, commit_index, sentinel) = {
+                let (current_term, commit_index) = {
                     let rf = rf.lock();
-                    (rf.current_term, rf.commit_index, rf.sentinel_commit_index)
+                    (rf.current_term, rf.commit_index)
                 };
-                this_daemon.run_verify_authority_iteration(
-                    current_term,
-                    commit_index,
-                    sentinel,
-                );
+                this_daemon
+                    .run_verify_authority_iteration(current_term, commit_index);
             }
             log::info!("{:?} verify authority daemon done.", me);
         };
@@ -465,7 +467,7 @@ mod tests {
     fn init_daemon() -> VerifyAuthorityDaemon {
         let daemon = VerifyAuthorityDaemon::create(PEER_SIZE);
 
-        daemon.reset_state(TERM);
+        daemon.reset_state(TERM, COMMIT_INDEX);
 
         const CURRENT_BEATS: [u64; 5] = [11, 9, 7, 5, 3];
         const TICKED: [u64; 5] = [0, 3, 1, 4, 2];
@@ -568,16 +570,24 @@ mod tests {
         let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 1);
         let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
 
-        daemon.reset_state(NEXT_TERM);
+        daemon.reset_state(NEXT_TERM, COMMIT_INDEX + 1);
         const CURRENT_BEATS: [u64; 5] = [12, 10, 8, 6, 4];
         for (index, beat_ticker) in daemon.beat_tickers.iter().enumerate() {
             assert_eq!(CURRENT_BEATS[index], beat_ticker.current_beat().0);
         }
 
-        assert_queue_len!(&daemon, 0);
         assert_ticket_ready!(t0, VerifyAuthorityResult::TermElapsed);
         assert_ticket_ready!(t1, VerifyAuthorityResult::TermElapsed);
         assert_ticket_ready!(t2, VerifyAuthorityResult::TermElapsed);
+
+        let state = daemon.state.lock();
+        assert_eq!(0, state.queue.len());
+        assert_eq!(0, state.start.0);
+        assert_eq!(COMMIT_INDEX + 1, state.sentinel_commit_index);
+        assert_eq!(NEXT_TERM, state.term);
+        for covered in &state.covered {
+            assert_eq!(0, covered.0);
+        }
     }
 
     #[test]
@@ -589,24 +599,16 @@ mod tests {
         let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
         let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
         // Run one iteration: no new commit, no new tick, for last term.
-        daemon.run_verify_authority_iteration(
-            PAST_TERM,
-            COMMIT_INDEX,
-            COMMIT_INDEX,
-        );
+        daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX);
         // Tokens should stay as-is.
         assert_queue_len!(&daemon, 5);
 
         // Run one iteration: no new commit, no new tick, for next term.
-        daemon.run_verify_authority_iteration(
-            NEXT_TERM,
-            COMMIT_INDEX,
-            COMMIT_INDEX,
-        );
+        daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
         // Tokens should stay as-is.
         assert_queue_len!(&daemon, 5);
 
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 3);
         {
             let queue = &daemon.state.lock().queue;
@@ -623,12 +625,12 @@ mod tests {
         let t3 = assert_ticket_pending!(t3);
         let t4 = assert_ticket_pending!(t4);
 
-        daemon.run_verify_authority_iteration(
-            TERM,
-            COMMIT_INDEX + 2,
-            // Clears the queue even if the sentinel is not committed.
-            COMMIT_INDEX + 3,
-        );
+        // Clears the queue even if the sentinel is not committed.
+        // Note this case is impossible in practise. We do not commit anything
+        // until the sentinel is committed.
+        daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 3;
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX + 2);
+
         assert_queue_len!(&daemon, 0);
         let at_index = COMMIT_INDEX + 2;
         assert_ticket_ready!(t2, VerifyAuthorityResult::Success(at_index));
@@ -654,7 +656,7 @@ mod tests {
         beat_ticker2.tick(beat2);
         // Run one iteration: one new tick is not enough.
         assert_queue_len!(&daemon, 1);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 1);
 
         let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
@@ -663,7 +665,7 @@ mod tests {
         assert_eq!(beat3.0, beat3_dup.0);
         // Run one iteration: one new tick for t0, zero for t1.
         assert_queue_len!(&daemon, 2);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         let t0 = assert_ticket_pending!(t0);
         let t1 = assert_ticket_pending!(t1);
         assert_queue_len!(&daemon, 2);
@@ -673,7 +675,7 @@ mod tests {
         beat_ticker3.tick(beat3_dup);
         // Run one iteration: two new ticks for t0, one for t1.
         assert_queue_len!(&daemon, 2);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         // t0 is out.
         assert_queue_len!(&daemon, 1);
         assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -687,7 +689,7 @@ mod tests {
         let beat4_newest = beat_ticker4.next_beat();
         // Run one iteration: two new ticks for t1, zero for t2.
         assert_queue_len!(&daemon, 2);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         // t1 is out.
         assert_queue_len!(&daemon, 1);
         assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -699,7 +701,7 @@ mod tests {
         beat_ticker4.tick(beat4);
         let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 3);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 3);
 
         // t2, t3 and t4 all receive beat4_newest.
@@ -709,7 +711,7 @@ mod tests {
         let beat1 = beat_ticker1.next_beat();
         beat_ticker1.tick(beat1);
         assert_queue_len!(&daemon, 3);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         // t2 is out
         assert_queue_len!(&daemon, 2);
         assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -723,7 +725,7 @@ mod tests {
         // Ancient beat
         beat_ticker2.tick(beat2_ancient);
         assert_queue_len!(&daemon, 2);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         // t3 is out
         assert_queue_len!(&daemon, 1);
         assert_ticket_ready!(t3, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -736,7 +738,7 @@ mod tests {
         beat_ticker4.tick(beat_ticker4.next_beat());
         assert_queue_len!(&daemon, 1);
         // Continue clearing the queue even if we are at a new term.
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 0);
         assert_ticket_ready!(t4, VerifyAuthorityResult::Success(COMMIT_INDEX));
     }
@@ -744,15 +746,15 @@ mod tests {
     #[test]
     fn test_clear_ticked_requests_no_sentinel() {
         let daemon = init_daemon();
+        daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 1;
+
         let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
         daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
         daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
         assert_queue_len!(&daemon, 1);
-        daemon.run_verify_authority_iteration(
-            TERM,
-            COMMIT_INDEX,
-            COMMIT_INDEX + 1, // Note: sentinel is not committed
-        );
+
+        // Note: sentinel is not committed.
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 1);
         assert_ticket_pending!(t0);
     }
@@ -764,11 +766,8 @@ mod tests {
         daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
         daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
         assert_queue_len!(&daemon, 1);
-        daemon.run_verify_authority_iteration(
-            NEXT_TERM, // Note: this is at the next term.
-            COMMIT_INDEX,
-            COMMIT_INDEX,
-        );
+        // Note: this is at the next term.
+        daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 0);
         assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
     }
@@ -780,7 +779,7 @@ mod tests {
         let _t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
         let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 3);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 3);
 
         {
@@ -793,7 +792,7 @@ mod tests {
         daemon.beat_tickers[0].tick(daemon.beat_tickers[0].next_beat());
         daemon.beat_tickers[1].tick(daemon.beat_tickers[1].next_beat());
         assert_queue_len!(&daemon, 1);
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 0);
 
         assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -822,25 +821,17 @@ mod tests {
         }
 
         // Run one iteration: no new commit, no new tick, for last term.
-        daemon.run_verify_authority_iteration(
-            PAST_TERM,
-            COMMIT_INDEX,
-            COMMIT_INDEX,
-        );
+        daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX);
         // Tokens should stay as-is.
         assert_queue_len!(&daemon, 5);
 
         // Run one iteration: no new commit, no new tick, for this term.
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         // Tokens should stay as-is.
         assert_queue_len!(&daemon, 5);
 
         // Run one iteration: no new commit, no new tick, for next term.
-        daemon.run_verify_authority_iteration(
-            NEXT_TERM,
-            COMMIT_INDEX,
-            COMMIT_INDEX,
-        );
+        daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
 
         assert_queue_len!(&daemon, 3);
         let queue = &daemon.state.lock().queue;
@@ -862,18 +853,14 @@ mod tests {
         let daemon = init_daemon();
 
         // Run of last term.
-        daemon.reset_state(PAST_TERM);
+        daemon.reset_state(PAST_TERM, COMMIT_INDEX - 1);
         let _t0 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX - 2);
         let _t1 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX - 1);
         let _t2 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX);
-        daemon.run_verify_authority_iteration(
-            PAST_TERM,
-            COMMIT_INDEX - 1,
-            COMMIT_INDEX - 1,
-        );
+        daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX - 1);
 
         // Run of current term.
-        daemon.reset_state(TERM);
+        daemon.reset_state(TERM, COMMIT_INDEX);
         let beat_ticker0 = daemon.beat_tickers[0].clone();
         let beat_ticker1 = daemon.beat_tickers[1].clone();
         let beat_ticker2 = daemon.beat_tickers[2].clone();
@@ -888,11 +875,7 @@ mod tests {
         assert_queue_len!(&daemon, 1);
 
         // Do nothing since sentinel is not committed yet.
-        daemon.run_verify_authority_iteration(
-            TERM,
-            COMMIT_INDEX - 1,
-            COMMIT_INDEX,
-        );
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX - 1);
         assert_queue_len!(&daemon, 1);
 
         // New request t1.
@@ -902,7 +885,7 @@ mod tests {
         assert_queue_len!(&daemon, 2);
 
         // Clear t0 but not t1.
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 1);
         // Cleared by the committed sentinel.
         assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -917,7 +900,7 @@ mod tests {
         assert_queue_len!(&daemon, 3);
 
         // Clear t1 and t2 because they are ticked.
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
+        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 1);
         // Note t0 and t1 have different commit indexes.
         assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
@@ -941,11 +924,8 @@ mod tests {
             t4.rough_time = Instant::now() - Duration::from_secs(1);
         }
         // Run for the next term.
-        daemon.run_verify_authority_iteration(
-            NEXT_TERM,
-            COMMIT_INDEX + 2,
-            COMMIT_INDEX + 2,
-        );
+        daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 2;
+        daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX + 2);
         assert_queue_len!(&daemon, 0);
         assert_ticket_ready!(
             t3,
@@ -959,7 +939,7 @@ mod tests {
         let daemon = init_daemon();
         // We were the leader at an earlier term.
         let stale_commit_index = COMMIT_INDEX;
-        let stale_sentinel_commit_index = COMMIT_INDEX;
+        let _stale_sentinel_commit_index = COMMIT_INDEX;
 
         // Then we lost leadership. Someone became the leader and created new
         // entries. Those entries are committed, but we did not know.
@@ -968,10 +948,10 @@ mod tests {
         // However, the new leader had answer queries at _prev_term_log_index.
 
         // We created a new sentinel, it is not yet committed.
-        let _sentinel_commit_index = COMMIT_INDEX + 3;
+        let sentinel_commit_index = COMMIT_INDEX + 3;
 
         // New term, we are the leader.
-        daemon.reset_state(NEXT_TERM);
+        daemon.reset_state(NEXT_TERM, sentinel_commit_index);
         let t = daemon.verify_authority_async(NEXT_TERM, COMMIT_INDEX);
 
         // We received 3 heartbeats.
@@ -983,12 +963,14 @@ mod tests {
         beat_ticker2.tick(beat_ticker2.next_beat());
 
         // We are now using stale data from the old term.
-        daemon.run_verify_authority_iteration(
-            TERM,
-            stale_commit_index,
-            stale_sentinel_commit_index,
+        daemon.run_verify_authority_iteration(TERM, stale_commit_index);
+        let t = assert_ticket_pending!(t);
+
+        // We are now using data from the new term.
+        daemon.run_verify_authority_iteration(NEXT_TERM, sentinel_commit_index);
+        assert_ticket_ready!(
+            t,
+            VerifyAuthorityResult::Success(sentinel_commit_index)
         );
-        // This is not right.
-        assert_ticket_ready!(t, VerifyAuthorityResult::Success(COMMIT_INDEX));
     }
 }