|
|
@@ -188,73 +188,10 @@ impl VerifyAuthorityDaemon {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- self.clear_committed_requests(current_term, commit_index);
|
|
|
self.clear_ticked_requests(commit_index);
|
|
|
self.remove_expired_requests(current_term);
|
|
|
}
|
|
|
|
|
|
- /// Clears all requests that have seen at least one commit.
|
|
|
- /// This function handles the following scenario: a verify authority request
|
|
|
- /// was received, when the `commit_index` was at C. Later as the leader we
|
|
|
- /// moved the commit index to at least C+1. That implies that when the
|
|
|
- /// request was first received, no other new commits after C could have been
|
|
|
- /// added to the log, either by this replica or others. It then follows that
|
|
|
- /// we can claim we had authority at that point.
|
|
|
- fn clear_committed_requests(
|
|
|
- &self,
|
|
|
- current_term: Term,
|
|
|
- commit_index: Index,
|
|
|
- ) {
|
|
|
- let mut state = self.state.lock();
|
|
|
- // We might skip some requests that could have been cleared, if we did
|
|
|
- // not react to the commit notification fast enough, and missed a
|
|
|
- // commit. This is about the case where in the last iteration
|
|
|
- // `commit_index` was `ci`, but in this iteration it becomes `ci + 2`
|
|
|
- // (or even larger), skipping `ci + 1`.
|
|
|
- //
|
|
|
- // Obviously skipping a commit is a problem if `ci + 2` and `ci + 1` are
|
|
|
- // both committed by us in this term. The requests that are cleared by
|
|
|
- // `+1` will be cleared by `+2` anyway. Similarly it is not a problem if
|
|
|
- // neither are committed by us in this term, since `+1` will not clear
|
|
|
- // any requests.
|
|
|
- //
|
|
|
- // If `+2` is not committed by us, but `+1` is, we lose the opportunity
|
|
|
- // to use `+1` to clear requests. The chances of losing this opportunity
|
|
|
- // are slim, because between `+1` and `+2`, there has to be a missed
|
|
|
- // heartbeat interval, and a new commit (`+2`) from another leader. We
|
|
|
- // have plenty of time to run this method before `+2` reaches us.
|
|
|
- //
|
|
|
- // Overall it is acceptable to simplify the implementation and risk
|
|
|
- // losing the mentioned opportunity.
|
|
|
- if current_term != state.term {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Note the commit_index in the queue might not be in increasing order.
|
|
|
- // We could still have requests that have a smaller commit_index after
|
|
|
- // this sweep. That is an acceptable tradeoff we are taking.
|
|
|
- while let Some(head) = state.queue.pop_front() {
|
|
|
- if head.commit_index >= commit_index {
|
|
|
- state.queue.push_front(head);
|
|
|
- break;
|
|
|
- }
|
|
|
- // At the start of the term, the previous leader might have exposed
|
|
|
- // all entries before the sentinel commit to clients. If a request
|
|
|
- // arrived before the sentinel commit is committed, its commit index
|
|
|
- // (token.commit_index) might be inaccurate. Thus we cannot allow
|
|
|
- // the client to return any state before the sentinel index.
|
|
|
- //
|
|
|
- // We did not choose the sentinel index but opted for a more strict
|
|
|
- // commit index, because the index is committed anyway. It should be
|
|
|
- // delivered to the application really quickly. We paid the price
|
|
|
- // with latency but made the request more fresh.
|
|
|
- let _ = head
|
|
|
- .sender
|
|
|
- .send(VerifyAuthorityResult::Success(commit_index));
|
|
|
- state.start.0 += 1;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/// Fetches the newest successful RPC response from peers, and mark verify
|
|
|
/// authority requests as complete if they are covered by more than half of
|
|
|
/// the replicas.
|
|
|
@@ -608,54 +545,6 @@ mod tests {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- #[test]
|
|
|
- fn test_clear_committed_requests() {
|
|
|
- let daemon = init_daemon();
|
|
|
- let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
|
|
|
- let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 1);
|
|
|
- let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
- let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
|
|
|
- let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
|
|
|
- // Run one iteration: no new commit, no new tick, for last term.
|
|
|
- daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX);
|
|
|
- // Tokens should stay as-is.
|
|
|
- assert_queue_len!(&daemon, 5);
|
|
|
-
|
|
|
- // Run one iteration: no new commit, no new tick, for next term.
|
|
|
- daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
|
|
|
- // Tokens should stay as-is.
|
|
|
- assert_queue_len!(&daemon, 5);
|
|
|
-
|
|
|
- daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
|
|
|
- assert_queue_len!(&daemon, 3);
|
|
|
- {
|
|
|
- let queue = &daemon.state.lock().queue;
|
|
|
- assert_eq!(queue[0].commit_index, COMMIT_INDEX);
|
|
|
- // We can reply SUCCESS to this token, but it is not at the beginning of
|
|
|
- // the queue, so we left it as-is.
|
|
|
- assert_eq!(queue[1].commit_index, COMMIT_INDEX - 2);
|
|
|
- assert_eq!(queue[2].commit_index, COMMIT_INDEX + 1);
|
|
|
- }
|
|
|
-
|
|
|
- assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
- assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
- let t2 = assert_ticket_pending!(t2);
|
|
|
- let t3 = assert_ticket_pending!(t3);
|
|
|
- let t4 = assert_ticket_pending!(t4);
|
|
|
-
|
|
|
- // Clears the queue even if the sentinel is not committed.
|
|
|
- // Note this case is impossible in practise. We do not commit anything
|
|
|
- // until the sentinel is committed.
|
|
|
- daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 3;
|
|
|
- daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX + 2);
|
|
|
-
|
|
|
- assert_queue_len!(&daemon, 0);
|
|
|
- let at_index = COMMIT_INDEX + 2;
|
|
|
- assert_ticket_ready!(t2, VerifyAuthorityResult::Success(at_index));
|
|
|
- assert_ticket_ready!(t3, VerifyAuthorityResult::Success(at_index));
|
|
|
- assert_ticket_ready!(t4, VerifyAuthorityResult::Success(at_index));
|
|
|
- }
|
|
|
-
|
|
|
#[test]
|
|
|
fn test_clear_ticked_requests() {
|
|
|
let daemon = init_daemon();
|
|
|
@@ -906,7 +795,10 @@ mod tests {
|
|
|
daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
|
|
|
assert_queue_len!(&daemon, 1);
|
|
|
// Cleared by the committed sentinel.
|
|
|
- assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ assert_ticket_ready!(
|
|
|
+ t0,
|
|
|
+ VerifyAuthorityResult::Success(COMMIT_INDEX - 1)
|
|
|
+ );
|
|
|
|
|
|
// New requests t2 and t3.
|
|
|
let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
|
|
|
@@ -962,7 +854,7 @@ mod tests {
|
|
|
// Then we lost leadership. Someone became the leader and created new
|
|
|
// entries. Those entries are committed, but we did not know.
|
|
|
// So our commit index is not moved.
|
|
|
- let _prev_term_log_index = COMMIT_INDEX + 2;
|
|
|
+ let prev_term_log_index = COMMIT_INDEX + 2;
|
|
|
// However, the new leader had answer queries at _prev_term_log_index.
|
|
|
|
|
|
// We created a new sentinel, it is not yet committed.
|
|
|
@@ -988,7 +880,7 @@ mod tests {
|
|
|
daemon.run_verify_authority_iteration(NEXT_TERM, sentinel_commit_index);
|
|
|
assert_ticket_ready!(
|
|
|
t,
|
|
|
- VerifyAuthorityResult::Success(sentinel_commit_index)
|
|
|
+ VerifyAuthorityResult::Success(prev_term_log_index)
|
|
|
);
|
|
|
}
|
|
|
|