|
|
@@ -11,7 +11,7 @@ use std::time::{Duration, Instant};
|
|
|
/// The result returned to a verify authority request.
|
|
|
/// This request is not directly exposed to end users. Instead it is used
|
|
|
/// internally to implement no-commit read-only requests.
|
|
|
-#[derive(Debug)]
|
|
|
+#[derive(Debug, Eq, PartialEq)]
|
|
|
pub enum VerifyAuthorityResult {
|
|
|
Success(Index),
|
|
|
TermElapsed,
|
|
|
@@ -191,7 +191,7 @@ impl VerifyAuthorityDaemon {
|
|
|
if commit_index >= sentinel_commit_index {
|
|
|
self.clear_ticked_requests();
|
|
|
}
|
|
|
- self.removed_expired_requests(current_term);
|
|
|
+ self.remove_expired_requests(current_term);
|
|
|
}
|
|
|
|
|
|
/// Clears all requests that have seen at least one commit.
|
|
|
@@ -318,7 +318,7 @@ impl VerifyAuthorityDaemon {
|
|
|
/// heartbeat period have passed. We do not immediately cancel all incoming
|
|
|
/// requests, in hope that we could still answer them accurately without
|
|
|
/// breaking the consistency guarantee.
|
|
|
- fn removed_expired_requests(&self, current_term: Term) {
|
|
|
+ fn remove_expired_requests(&self, current_term: Term) {
|
|
|
let mut state = self.state.lock();
|
|
|
// Return if we are still the leader, or we become the leader again.
|
|
|
//
|
|
|
@@ -483,6 +483,32 @@ mod tests {
|
|
|
daemon
|
|
|
}
|
|
|
|
|
|
+ macro_rules! assert_queue_len {
|
|
|
+ ($daemon:expr, $len:expr) => {
|
|
|
+ assert_eq!($daemon.state.lock().queue.len(), $len);
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ macro_rules! assert_ticket_ready {
|
|
|
+ ($t:expr, $e:expr) => {{
|
|
|
+ let mut receiver = $t.expect("Ticket should be valid");
|
|
|
+ let result = receiver
|
|
|
+ .try_recv()
|
|
|
+ .expect("The receiver should be ready with the result");
|
|
|
+ assert_eq!(result, $e);
|
|
|
+ }};
|
|
|
+ }
|
|
|
+
|
|
|
+ macro_rules! assert_ticket_pending {
|
|
|
+ ($t:expr) => {{
|
|
|
+ let mut receiver = $t.expect("Ticket should be valid");
|
|
|
+ let err = receiver
|
|
|
+ .try_recv()
|
|
|
+ .expect_err("The receiver should not be ready");
|
|
|
+ assert_eq!(err, tokio::sync::oneshot::error::TryRecvError::Empty);
|
|
|
+ }};
|
|
|
+ }
|
|
|
+
|
|
|
#[test]
|
|
|
fn test_verify_authority_async() {
|
|
|
let daemon = init_daemon();
|
|
|
@@ -533,4 +559,64 @@ mod tests {
|
|
|
assert_eq!(0, state.queue.len());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn test_remove_expired_requests() {
|
|
|
+ let daemon = init_daemon();
|
|
|
+ let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
|
|
|
+ let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 2);
|
|
|
+ let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
|
|
|
+ let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 2);
|
|
|
+ let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
|
|
|
+
|
|
|
+ // Override rough_time to test correctness.
|
|
|
+ let now = Instant::now();
|
|
|
+ {
|
|
|
+ let mut state = daemon.state.lock();
|
|
|
+ assert_eq!(5, state.queue.len());
|
|
|
+
|
|
|
+ state.queue[0].rough_time = now - Duration::from_millis(1000);
|
|
|
+ state.queue[1].rough_time = now - Duration::from_millis(500);
|
|
|
+ state.queue[2].rough_time = now - Duration::from_millis(10);
|
|
|
+ state.queue[3].rough_time = now - Duration::from_millis(1000);
|
|
|
+ state.queue[4].rough_time = now;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run one iteration: no new commit, no new tick, for last term.
|
|
|
+ let past_term = Term(TERM.0 - 1);
|
|
|
+ daemon.run_verify_authority_iteration(
|
|
|
+ past_term,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ );
|
|
|
+ // Tokens should stay as-is.
|
|
|
+ assert_queue_len!(&daemon, 5);
|
|
|
+
|
|
|
+ // Run one iteration: no new commit, no new tick, for this term.
|
|
|
+ daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX, COMMIT_INDEX);
|
|
|
+ // Tokens should stay as-is.
|
|
|
+ assert_queue_len!(&daemon, 5);
|
|
|
+
|
|
|
+ // Run one iteration: no new commit, no new tick, for next term.
|
|
|
+ let next_term = Term(TERM.0 + 1);
|
|
|
+ daemon.run_verify_authority_iteration(
|
|
|
+ next_term,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ COMMIT_INDEX,
|
|
|
+ );
|
|
|
+
|
|
|
+ assert_queue_len!(&daemon, 3);
|
|
|
+ let queue = &daemon.state.lock().queue;
|
|
|
+ assert_eq!(queue[0].rough_time, now - Duration::from_millis(10));
|
|
|
+ // The token actually expired, but we should not remove it since it is
|
|
|
+ // not at the beginning of the queue.
|
|
|
+ assert_eq!(queue[1].rough_time, now - Duration::from_millis(1000));
|
|
|
+ assert_eq!(queue[2].rough_time, now);
|
|
|
+
|
|
|
+ assert_ticket_ready!(t0, VerifyAuthorityResult::TimedOut);
|
|
|
+ assert_ticket_ready!(t1, VerifyAuthorityResult::TimedOut);
|
|
|
+ assert_ticket_pending!(t2);
|
|
|
+ assert_ticket_pending!(t3);
|
|
|
+ assert_ticket_pending!(t4);
|
|
|
+ }
|
|
|
}
|