|
|
@@ -488,7 +488,7 @@ mod tests {
|
|
|
|
|
|
macro_rules! assert_queue_len {
|
|
|
($daemon:expr, $len:expr) => {
|
|
|
- assert_eq!($daemon.state.lock().queue.len(), $len);
|
|
|
+ assert_eq!($len, $daemon.state.lock().queue.len());
|
|
|
};
|
|
|
}
|
|
|
|
|
|
@@ -621,6 +621,169 @@ mod tests {
|
|
|
assert_ticket_ready!(t4, VerifyAuthorityResult::Success(at_index));
|
|
|
}
|
|
|
|
|
|
+ #[test]
|
|
|
+ fn test_clear_ticked_requests() {
|
|
|
+ let daemon = init_daemon();
|
|
|
+ let beat_ticker0 = daemon.beat_tickers[0].clone();
|
|
|
+ let beat_ticker1 = daemon.beat_tickers[1].clone();
|
|
|
+ let beat_ticker2 = daemon.beat_tickers[2].clone();
|
|
|
+ let beat_ticker3 = daemon.beat_tickers[3].clone();
|
|
|
+ let beat_ticker4 = daemon.beat_tickers[4].clone();
|
|
|
+
|
|
|
+ // An ancient tick that will be ticked at the end of the test.
|
|
|
+ let beat2_ancient = beat_ticker2.ticked();
|
|
|
+
|
|
|
+ let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ // t0 receives beat2.
|
|
|
+ let beat2 = beat_ticker2.next_beat();
|
|
|
+ beat_ticker2.tick(beat2);
|
|
|
+ // Run one iteration: one new tick is not enough.
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+
|
|
|
+ let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let beat3_dup = beat_ticker3.current_beat();
|
|
|
+ let beat3 = beat_ticker3.next_beat();
|
|
|
+ assert_eq!(beat3.0, beat3_dup.0);
|
|
|
+ // Run one iteration: one new tick for t0, zero for t1.
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ let t0 = assert_ticket_pending!(t0);
|
|
|
+ let t1 = assert_ticket_pending!(t1);
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+
|
|
|
+ // Tick the same beat twice. t0 and t1 receives beat3.
|
|
|
+ beat_ticker3.tick(beat3);
|
|
|
+ beat_ticker3.tick(beat3_dup);
|
|
|
+ // Run one iteration: two new ticks for t0, one for t1.
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ // t0 is out.
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ let t1 = assert_ticket_pending!(t1);
|
|
|
+
|
|
|
+ // t1 receives a beat from beat_ticker4.
|
|
|
+ beat_ticker4.next_beat(); // a lost beat.
|
|
|
+ beat_ticker4.tick(beat_ticker4.next_beat()); // an immediate beat.
|
|
|
+ let beat4 = beat_ticker4.next_beat();
|
|
|
+ let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let beat4_newest = beat_ticker4.next_beat();
|
|
|
+ // Run one iteration: two new ticks for t1, zero for t2.
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ // t1 is out.
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ let t2 = assert_ticket_pending!(t2);
|
|
|
+
|
|
|
+ let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let beat0 = beat_ticker0.next_beat();
|
|
|
+ // Not a new vote for t2: the beat is not recent enough.
|
|
|
+ beat_ticker4.tick(beat4);
|
|
|
+ let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+
|
|
|
+ // t2, t3 and t4 all receive beat4_newest.
|
|
|
+ // Two new votes for t2, one for t3 and one for t4.
|
|
|
+ beat_ticker4.tick(beat4_newest);
|
|
|
+ let beat1_stale = beat_ticker1.next_beat();
|
|
|
+ let beat1 = beat_ticker1.next_beat();
|
|
|
+ beat_ticker1.tick(beat1);
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ // t2 is out
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+ assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ let t3 = assert_ticket_pending!(t3);
|
|
|
+ let t4 = assert_ticket_pending!(t4);
|
|
|
+
|
|
|
+ // New vote for t3, but not for t4.
|
|
|
+ beat_ticker0.tick(beat0);
|
|
|
+ // Stale beat for t3 and t4.
|
|
|
+ beat_ticker1.tick(beat1_stale);
|
|
|
+ // Ancient beat
|
|
|
+ beat_ticker2.tick(beat2_ancient);
|
|
|
+ assert_queue_len!(&daemon, 2);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ // t3 is out
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ assert_ticket_ready!(t3, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ let t4 = assert_ticket_pending!(t4);
|
|
|
+
|
|
|
+ // Many new votes for t4.
|
|
|
+ beat_ticker1.tick(beat_ticker1.next_beat());
|
|
|
+ beat_ticker2.tick(beat_ticker2.next_beat());
|
|
|
+ beat_ticker3.tick(beat_ticker3.next_beat());
|
|
|
+ beat_ticker4.tick(beat_ticker4.next_beat());
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ // Continue clearing the queue even if we are at a new term.
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 0);
|
|
|
+ assert_ticket_ready!(t4, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn test_clear_ticked_requests_no_sentinel() {
|
|
|
+ let daemon = init_daemon();
|
|
|
+ let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
|
|
|
+ daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ daemon.run_verify_authority_iteration(
|
|
|
+ TERM,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ COMMIT_INDEX + 1, // Note: sentinel is not committed
|
|
|
+ );
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ assert_ticket_pending!(t0);
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn test_clear_ticked_requests_lost_leadership() {
|
|
|
+ let daemon = init_daemon();
|
|
|
+ let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
|
|
|
+ daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ daemon.run_verify_authority_iteration(
|
|
|
+ NEXT_TERM, // Note: this is at the next term.
|
|
|
+ COMMIT_INDEX,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ );
|
|
|
+ assert_queue_len!(&daemon, 0);
|
|
|
+ assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn test_clear_ticked_requests_cleared_by_others() {
|
|
|
+ let daemon = init_daemon();
|
|
|
+ let _t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let _t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+
|
|
|
+ {
|
|
|
+ let mut state = daemon.state.lock();
|
|
|
+ state.start = QueueIndex(2);
|
|
|
+ state.queue.pop_front().expect("Queue should not be empty");
|
|
|
+ state.queue.pop_front().expect("Queue should not be empty");
|
|
|
+ }
|
|
|
+
|
|
|
+ daemon.beat_tickers[0].tick(daemon.beat_tickers[0].next_beat());
|
|
|
+ daemon.beat_tickers[1].tick(daemon.beat_tickers[1].next_beat());
|
|
|
+ assert_queue_len!(&daemon, 1);
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ assert_queue_len!(&daemon, 0);
|
|
|
+
|
|
|
+ assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
|
|
|
+ }
|
|
|
+
|
|
|
#[test]
|
|
|
fn test_remove_expired_requests() {
|
|
|
let daemon = init_daemon();
|