verify_authority.rs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935
  1. use std::collections::VecDeque;
  2. use std::future::Future;
  3. use std::sync::atomic::Ordering;
  4. use std::sync::Arc;
  5. use std::time::{Duration, Instant};
  6. use parking_lot::{Condvar, Mutex};
  7. use crate::beat_ticker::{Beat, SharedBeatTicker};
  8. use crate::heartbeats::HEARTBEAT_INTERVAL;
  9. use crate::{Index, Raft, Term};
  10. /// The result returned to a verify authority request.
  11. /// This request is not directly exposed to end users. Instead it is used
  12. /// internally to implement no-commit read-only requests.
  13. #[derive(Debug, Eq, PartialEq)]
  14. pub enum VerifyAuthorityResult {
  15. Success(Index),
  16. TermElapsed,
  17. TimedOut,
  18. }
  19. /// Token stored in the internal queue for authority verification. Each token
  20. /// represents one verification request.
  21. #[derive(Debug)]
  22. struct VerifyAuthorityToken {
  23. commit_index: Index,
  24. beats_moment: Vec<Beat>,
  25. rough_time: Instant,
  26. sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
  27. }
  28. #[derive(Clone, Copy, Debug, Eq, Ord, PartialOrd, PartialEq)]
  29. struct QueueIndex(usize);
  30. /// The state of this daemon, should bee protected by a mutex.
  31. struct VerifyAuthorityState {
  32. /// The current term. Might be behind the real term in the cluster.
  33. term: Term,
  34. /// Pending requests to verify authority.
  35. queue: VecDeque<VerifyAuthorityToken>,
  36. /// Number of requests that have been processed.
  37. start: QueueIndex,
  38. /// A vector of queue indexes. Each element in this vector indicates the
  39. /// index of the first request that has not been confirmed by the
  40. /// corresponding peer.
  41. /// These indexes include all processed requests. They will never go down.
  42. covered: Vec<QueueIndex>,
  43. /// The index of the first commit, created at the start of the term.
  44. sentinel_commit_index: Index,
  45. }
  46. impl VerifyAuthorityState {
  47. pub fn create(peer_count: usize) -> Self {
  48. VerifyAuthorityState {
  49. term: Term(0),
  50. queue: VecDeque::new(),
  51. start: QueueIndex(0),
  52. covered: vec![QueueIndex(0); peer_count],
  53. sentinel_commit_index: 0,
  54. }
  55. }
  56. pub fn reset(&mut self, term: Term, sentinel_commit_index: Index) {
  57. self.clear_tickets();
  58. self.term = term;
  59. self.start = QueueIndex(0);
  60. self.sentinel_commit_index = sentinel_commit_index;
  61. for item in self.covered.iter_mut() {
  62. *item = QueueIndex(0)
  63. }
  64. }
  65. pub fn clear_tickets(&mut self) {
  66. for token in self.queue.drain(..) {
  67. let _ = token.sender.send(VerifyAuthorityResult::TermElapsed);
  68. }
  69. }
  70. }
  71. #[derive(Clone)]
  72. pub(crate) struct DaemonBeatTicker {
  73. beat_ticker: SharedBeatTicker,
  74. condvar: Arc<Condvar>,
  75. }
  76. impl DaemonBeatTicker {
  77. pub fn next_beat(&self) -> Beat {
  78. self.beat_ticker.next_beat()
  79. }
  80. pub fn tick(&self, beat: Beat) {
  81. self.beat_ticker.tick(beat);
  82. self.condvar.notify_one();
  83. }
  84. }
  85. #[derive(Clone)]
  86. pub(crate) struct VerifyAuthorityDaemon {
  87. state: Arc<Mutex<VerifyAuthorityState>>,
  88. beat_tickers: Vec<SharedBeatTicker>,
  89. condvar: Arc<Condvar>,
  90. }
  91. impl VerifyAuthorityDaemon {
  92. pub fn create(peer_count: usize) -> Self {
  93. Self {
  94. state: Arc::new(Mutex::new(VerifyAuthorityState::create(
  95. peer_count,
  96. ))),
  97. beat_tickers: (0..peer_count)
  98. .map(|_| SharedBeatTicker::create())
  99. .collect(),
  100. condvar: Arc::new(Condvar::new()),
  101. }
  102. }
  103. pub fn wait_for(&self, timeout: Duration) {
  104. let mut guard = self.state.lock();
  105. self.condvar.wait_for(&mut guard, timeout);
  106. }
  107. pub fn reset_state(&self, term: Term, sentinel_commit_index: Index) {
  108. self.state.lock().reset(term, sentinel_commit_index);
  109. // Increase all beats by one to make sure upcoming verify authority
  110. // requests wait for beats in the current term. This in fact creates
  111. // phantom beats that will never be marked as completed by themselves.
  112. // They will be automatically `ticked()` when newer (real) beats are
  113. // created, sent and `ticked()`.
  114. for beat_ticker in self.beat_tickers.iter() {
  115. beat_ticker.next_beat();
  116. }
  117. }
  118. /// Enqueues a verify authority request. Returns a receiver of the
  119. /// verification result. Returns None if the term has passed.
  120. pub fn verify_authority_async(
  121. &self,
  122. current_term: Term,
  123. commit_index: Index,
  124. ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
  125. let mut state = self.state.lock();
  126. // The inflight beats are sent at least for `current_term`. This is
  127. // guaranteed by the fact that we immediately increase beats for all
  128. // peers after being elected, before releasing the "elected" message to
  129. // the rest of the Raft system. The newest beats we get here are at
  130. // least as new as the phantom beats created by `Self::reset_state()`.
  131. let beats_moment = self
  132. .beat_tickers
  133. .iter()
  134. .map(|beat_ticker| beat_ticker.current_beat())
  135. .collect();
  136. // The inflight beats could also be for any term after `current_term`.
  137. // We must check if the term stored in the daemon is the same as
  138. // `current_term`.
  139. // `state.term` could be smaller than `current_term`, if a new term is
  140. // started by someone else and we lost leadership.
  141. // `state.term` could be greater than `current_term`, if we lost
  142. // leadership but are elected leader again in a following term.
  143. // In both cases, we cannot confirm the leadership at `current_term`.
  144. if state.term != current_term {
  145. return None;
  146. }
  147. let (sender, receiver) = tokio::sync::oneshot::channel();
  148. let token = VerifyAuthorityToken {
  149. commit_index,
  150. beats_moment,
  151. rough_time: Instant::now(),
  152. sender,
  153. };
  154. state.queue.push_back(token);
  155. Some(receiver)
  156. }
  157. /// Run one iteration of the verify authority daemon.
  158. pub fn run_verify_authority_iteration(
  159. &self,
  160. current_term: Term,
  161. commit_index: Index,
  162. ) {
  163. // Opportunistic check: do nothing if we don't have any requests.
  164. if self.state.lock().queue.is_empty() {
  165. return;
  166. }
  167. self.clear_ticked_requests(commit_index);
  168. self.remove_expired_requests(current_term);
  169. }
  170. /// Fetches the newest successful RPC response from peers, and mark verify
  171. /// authority requests as complete if they are covered by more than half of
  172. /// the replicas.
  173. fn clear_ticked_requests(&self, commit_index: Index) {
  174. // Do not use ticks to clear requests if we have not committed at least
  175. // one log entry since the start of the term. At the start of the term,
  176. // the leader might not know the commit index of the previous leader.
  177. // This holds true even it is guaranteed that all entries committed by
  178. // the previous leader will be committed by the current leader.
  179. //
  180. // Similarly, if the sentinel is not committed, the leader cannot know
  181. // if all entries of the previous leader will be committed, in case the
  182. // leadership is lost before any commits can be made. Thus, the leader
  183. // cannot answer any queries before the sentinel is committed.
  184. if commit_index < self.state.lock().sentinel_commit_index {
  185. return;
  186. }
  187. for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
  188. // Fetches the newest successful RPC response from the current peer.
  189. let ticked = beat_ticker.ticked();
  190. let mut state = self.state.lock();
  191. // Update progress with `ticked`. All requests that came before
  192. // `ticked` now have one more votes of leader authority from the
  193. // current peer.
  194. let first_not_ticked_index = state.queue.partition_point(|token| {
  195. token.beats_moment[peer_index] <= ticked
  196. });
  197. let new_covered = first_not_ticked_index + state.start.0;
  198. if new_covered < state.covered[peer_index].0 {
  199. log::error!(
  200. "Ticked index moving backwards from {} to {} for peer {}",
  201. state.covered[peer_index].0,
  202. new_covered,
  203. peer_index,
  204. );
  205. }
  206. assert!(new_covered >= state.covered[peer_index].0);
  207. state.covered[peer_index].0 = new_covered;
  208. // Count the requests that has more than N / 2 votes. We always have
  209. // the vote from ourselves, but the value is 0 in `covered` array.
  210. let mut sorted_covered = state.covered.to_owned();
  211. sorted_covered.sort_unstable();
  212. let mid = sorted_covered.len() / 2 + 1;
  213. let new_start = sorted_covered[mid];
  214. // `state.start` could have been moved by other means, e.g. by a
  215. // subsequent commit of the same term after the beat is issued.
  216. // Then the relevant verify authority requests have been processed.
  217. // If all ticked requests have been processed, nothing needs to be
  218. // done. Skip to the next iteration.
  219. if new_start <= state.start {
  220. continue;
  221. }
  222. // All requests before `new_start` is now verified.
  223. let verified = new_start.0 - state.start.0;
  224. let sentinel_commit_index = state.sentinel_commit_index;
  225. for token in state.queue.drain(..verified) {
  226. let mut cnt = 0;
  227. for (index, beat) in token.beats_moment.iter().enumerate() {
  228. if self.beat_tickers[index].ticked() >= *beat {
  229. cnt += 1;
  230. }
  231. }
  232. if cnt + cnt + 1 < self.beat_tickers.len() {
  233. log::error!("Token {:?} is not covered", token);
  234. }
  235. assert!(cnt + cnt + 1 >= self.beat_tickers.len());
  236. // Never verify authority before the sentinel commit index. The
  237. // previous leader might have exposed data up to the commit
  238. // right before the sentinel.
  239. let allowed_index =
  240. if sentinel_commit_index > token.commit_index {
  241. // sentinel_commit_index cannot be at 0 after the `if`.
  242. sentinel_commit_index - 1
  243. } else {
  244. token.commit_index
  245. };
  246. let _ = token
  247. .sender
  248. .send(VerifyAuthorityResult::Success(allowed_index));
  249. }
  250. // Move the queue starting point.
  251. state.start = new_start;
  252. }
  253. }
  254. const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
  255. Duration::from_millis(HEARTBEAT_INTERVAL.as_millis() as u64 * 2);
  256. /// Remove expired requests if we are no longer the leader.
  257. /// If we have lost leadership, we are unlikely to receive confirmations
  258. /// of past leadership state from peers. Requests are expired after two
  259. /// heartbeat period have passed. We do not immediately cancel all incoming
  260. /// requests, in hope that we could still answer them accurately without
  261. /// breaking the consistency guarantee.
  262. fn remove_expired_requests(&self, current_term: Term) {
  263. let mut state = self.state.lock();
  264. // Return if we are still the leader, or we become the leader again.
  265. //
  266. // Note that we do not hold the main raft state lock, thus the value of
  267. // `current_term` might not be up-to-date. We only update `state.term`
  268. // after an election. If in a term after `current_term`, we are elected
  269. // leader again, `state.term` could be updated and thus greater than the
  270. // (now stale) `current_term`. In that case, the queue should have been
  271. // reset. There will be no expired request to remove.
  272. if state.term >= current_term {
  273. return;
  274. }
  275. let expiring_line =
  276. Instant::now() - Self::VERIFY_AUTHORITY_REQUEST_EXPIRATION;
  277. // Assuming bounded clock skew, otherwise we will lose efficiency.
  278. let expired =
  279. |head: &VerifyAuthorityToken| head.rough_time < expiring_line;
  280. // Note rough_time might not be in increasing order, so we might still
  281. // have requests that are expired in the queue after the sweep.
  282. while state.queue.front().is_some_and(expired) {
  283. state
  284. .queue
  285. .pop_front()
  286. .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
  287. state.start.0 += 1;
  288. }
  289. }
  290. pub fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
  291. DaemonBeatTicker {
  292. beat_ticker: self.beat_tickers[peer_index].clone(),
  293. condvar: self.condvar.clone(),
  294. }
  295. }
  296. pub fn kill(&self) {
  297. let term = self.state.lock().term;
  298. // Fail all inflight verify authority requests. It is important to do
  299. // this so that the RPC framework could drop requests served by us and
  300. // release all references to the Raft instance.
  301. self.reset_state(term, Index::MAX);
  302. self.condvar.notify_all();
  303. }
  304. }
  305. impl<Command: 'static + Send> Raft<Command> {
  306. const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
  307. /// Create a thread and runs the verify authority daemon.
  308. pub(crate) fn run_verify_authority_daemon(&self) -> impl FnOnce() {
  309. let me = self.me;
  310. let keep_running = self.keep_running.clone();
  311. let this_daemon = self.verify_authority_daemon.clone();
  312. let rf = self.inner_state.clone();
  313. move || {
  314. log::info!("{:?} verify authority daemon running ...", me);
  315. while keep_running.load(Ordering::Relaxed) {
  316. this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
  317. let (current_term, commit_index) = {
  318. let rf = rf.lock();
  319. (rf.current_term, rf.commit_index)
  320. };
  321. this_daemon
  322. .run_verify_authority_iteration(current_term, commit_index);
  323. }
  324. log::info!("{:?} verify authority daemon done.", me);
  325. }
  326. }
  327. /// Create a verify authority request. Returns None if we are not the
  328. /// leader.
  329. ///
  330. /// A successful verification allows the application to respond to read-only
  331. /// requests that arrived before this function is called. The answer must
  332. /// include all commands at or before a certain index, which is returned to
  333. /// the application with the successful verification result. The index is
  334. /// in fact the commit index at the moment this function was called. It is
  335. /// guaranteed that no other commands could possibly have been committed at
  336. /// the moment this function was called.
  337. ///
  338. /// The application is also free to include any subsequent commits in the
  339. /// response. Consistency is still guaranteed, because Raft never rolls back
  340. /// committed commands.
  341. pub fn verify_authority_async(
  342. &self,
  343. ) -> Option<impl Future<Output = crate::VerifyAuthorityResult>> {
  344. // Fail the request if we have been killed.
  345. if !self.keep_running.load(Ordering::Relaxed) {
  346. return None;
  347. }
  348. let (term, commit_index, last_index) = {
  349. let rf = self.inner_state.lock();
  350. if !rf.is_leader() {
  351. // Returning none instead of `Pending::Ready(TermElapsed)`,
  352. // because that requires a separate struct that implements
  353. // Future, which is tedious to write.
  354. return None;
  355. }
  356. (
  357. rf.current_term,
  358. rf.commit_index,
  359. rf.log.last_index_term().index,
  360. )
  361. };
  362. let receiver = self
  363. .verify_authority_daemon
  364. .verify_authority_async(term, commit_index);
  365. let force_heartbeat = commit_index == last_index;
  366. self.heartbeats_daemon.trigger(force_heartbeat);
  367. receiver.map(|receiver| async move {
  368. receiver
  369. .await
  370. .expect("Verify authority daemon never drops senders")
  371. })
  372. }
  373. }
  374. #[cfg(test)]
  375. mod tests {
  376. use super::*;
  377. const PEER_SIZE: usize = 5;
  378. const PAST_TERM: Term = Term(2);
  379. const TERM: Term = Term(3);
  380. const NEXT_TERM: Term = Term(4);
  381. const COMMIT_INDEX: Index = 8;
  382. fn init_daemon() -> VerifyAuthorityDaemon {
  383. let daemon = VerifyAuthorityDaemon::create(PEER_SIZE);
  384. daemon.reset_state(TERM, COMMIT_INDEX);
  385. const CURRENT_BEATS: [u64; 5] = [11, 9, 7, 5, 3];
  386. const TICKED: [u64; 5] = [0, 3, 1, 4, 2];
  387. for (index, beat_ticker) in daemon.beat_tickers.iter().enumerate() {
  388. for _ in 1..(PEER_SIZE - index) * 2 {
  389. beat_ticker.next_beat();
  390. }
  391. beat_ticker.tick(Beat((index * 3 % PEER_SIZE) as u64));
  392. assert_eq!(Beat(CURRENT_BEATS[index]), beat_ticker.current_beat());
  393. assert_eq!(Beat(TICKED[index]), beat_ticker.ticked());
  394. }
  395. daemon
  396. }
  397. macro_rules! assert_queue_len {
  398. ($daemon:expr, $len:expr) => {
  399. assert_eq!($len, $daemon.state.lock().queue.len());
  400. };
  401. }
  402. macro_rules! assert_ticket_ready {
  403. ($t:expr, $e:expr) => {{
  404. let mut receiver = $t.expect("Ticket should be valid");
  405. let result = receiver
  406. .try_recv()
  407. .expect("The receiver should be ready with the result");
  408. assert_eq!(result, $e);
  409. Some(receiver);
  410. }};
  411. }
  412. macro_rules! assert_ticket_pending {
  413. ($t:expr) => {{
  414. let mut receiver = $t.expect("Ticket should be valid");
  415. let err = receiver
  416. .try_recv()
  417. .expect_err("The receiver should not be ready");
  418. assert_eq!(err, tokio::sync::oneshot::error::TryRecvError::Empty);
  419. Some(receiver)
  420. }};
  421. }
  422. #[test]
  423. fn test_verify_authority_async() {
  424. let daemon = init_daemon();
  425. let ticket = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  426. ticket.expect("Getting ticket should not fail immediately");
  427. {
  428. let state = daemon.state.lock();
  429. assert_eq!(1, state.queue.len());
  430. #[allow(clippy::get_first)]
  431. let token = state.queue.get(0).unwrap();
  432. assert_eq!(
  433. [Beat(11), Beat(9), Beat(7), Beat(5), Beat(3)],
  434. token.beats_moment.as_slice()
  435. );
  436. assert_eq!(COMMIT_INDEX, token.commit_index);
  437. }
  438. daemon.beat_ticker(4).next_beat();
  439. daemon.beat_ticker(2).next_beat();
  440. daemon.verify_authority_async(TERM, COMMIT_INDEX + 10);
  441. {
  442. let state = daemon.state.lock();
  443. assert_eq!(2, state.queue.len());
  444. let token = state.queue.get(1).unwrap();
  445. assert_eq!(
  446. [Beat(11), Beat(9), Beat(8), Beat(5), Beat(4)],
  447. token.beats_moment.as_slice()
  448. );
  449. assert_eq!(COMMIT_INDEX + 10, token.commit_index);
  450. }
  451. }
  452. #[test]
  453. fn test_verify_authority_async_term_mismatch() {
  454. let daemon = init_daemon();
  455. let ticket =
  456. daemon.verify_authority_async(Term(TERM.0 + 1), COMMIT_INDEX);
  457. assert!(
  458. ticket.is_none(),
  459. "Should not issue a ticket for future terms"
  460. );
  461. let ticket =
  462. daemon.verify_authority_async(Term(TERM.0 - 1), COMMIT_INDEX);
  463. assert!(ticket.is_none(), "Should not issue a ticket for past terms");
  464. {
  465. let state = daemon.state.lock();
  466. assert_eq!(0, state.queue.len());
  467. }
  468. }
  469. #[test]
  470. fn test_reset_state() {
  471. let daemon = init_daemon();
  472. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
  473. let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 1);
  474. let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  475. daemon.reset_state(NEXT_TERM, COMMIT_INDEX + 1);
  476. const CURRENT_BEATS: [u64; 5] = [12, 10, 8, 6, 4];
  477. for (index, beat_ticker) in daemon.beat_tickers.iter().enumerate() {
  478. assert_eq!(CURRENT_BEATS[index], beat_ticker.current_beat().0);
  479. }
  480. assert_ticket_ready!(t0, VerifyAuthorityResult::TermElapsed);
  481. assert_ticket_ready!(t1, VerifyAuthorityResult::TermElapsed);
  482. assert_ticket_ready!(t2, VerifyAuthorityResult::TermElapsed);
  483. let state = daemon.state.lock();
  484. assert_eq!(0, state.queue.len());
  485. assert_eq!(0, state.start.0);
  486. assert_eq!(COMMIT_INDEX + 1, state.sentinel_commit_index);
  487. assert_eq!(NEXT_TERM, state.term);
  488. for covered in &state.covered {
  489. assert_eq!(0, covered.0);
  490. }
  491. }
  492. #[test]
  493. fn test_clear_ticked_requests() {
  494. let daemon = init_daemon();
  495. let beat_ticker0 = daemon.beat_tickers[0].clone();
  496. let beat_ticker1 = daemon.beat_tickers[1].clone();
  497. let beat_ticker2 = daemon.beat_tickers[2].clone();
  498. let beat_ticker3 = daemon.beat_tickers[3].clone();
  499. let beat_ticker4 = daemon.beat_tickers[4].clone();
  500. // An ancient tick that will be ticked at the end of the test.
  501. let beat2_ancient = beat_ticker2.ticked();
  502. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  503. // t0 receives beat2.
  504. let beat2 = beat_ticker2.next_beat();
  505. beat_ticker2.tick(beat2);
  506. // Run one iteration: one new tick is not enough.
  507. assert_queue_len!(&daemon, 1);
  508. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  509. assert_queue_len!(&daemon, 1);
  510. let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  511. let beat3_dup = beat_ticker3.current_beat();
  512. let beat3 = beat_ticker3.next_beat();
  513. assert_eq!(beat3.0, beat3_dup.0);
  514. // Run one iteration: one new tick for t0, zero for t1.
  515. assert_queue_len!(&daemon, 2);
  516. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  517. let t0 = assert_ticket_pending!(t0);
  518. let t1 = assert_ticket_pending!(t1);
  519. assert_queue_len!(&daemon, 2);
  520. // Tick the same beat twice. t0 and t1 receives beat3.
  521. beat_ticker3.tick(beat3);
  522. beat_ticker3.tick(beat3_dup);
  523. // Run one iteration: two new ticks for t0, one for t1.
  524. assert_queue_len!(&daemon, 2);
  525. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  526. // t0 is out.
  527. assert_queue_len!(&daemon, 1);
  528. assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
  529. let t1 = assert_ticket_pending!(t1);
  530. // t1 receives a beat from beat_ticker4.
  531. beat_ticker4.next_beat(); // a lost beat.
  532. beat_ticker4.tick(beat_ticker4.next_beat()); // an immediate beat.
  533. let beat4 = beat_ticker4.next_beat();
  534. let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  535. let beat4_newest = beat_ticker4.next_beat();
  536. // Run one iteration: two new ticks for t1, zero for t2.
  537. assert_queue_len!(&daemon, 2);
  538. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  539. // t1 is out.
  540. assert_queue_len!(&daemon, 1);
  541. assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
  542. let t2 = assert_ticket_pending!(t2);
  543. let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  544. let beat0 = beat_ticker0.next_beat();
  545. // Not a new vote for t2: the beat is not recent enough.
  546. beat_ticker4.tick(beat4);
  547. let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  548. assert_queue_len!(&daemon, 3);
  549. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  550. assert_queue_len!(&daemon, 3);
  551. // t2, t3 and t4 all receive beat4_newest.
  552. // Two new votes for t2, one for t3 and one for t4.
  553. beat_ticker4.tick(beat4_newest);
  554. let beat1_stale = beat_ticker1.next_beat();
  555. let beat1 = beat_ticker1.next_beat();
  556. beat_ticker1.tick(beat1);
  557. assert_queue_len!(&daemon, 3);
  558. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  559. // t2 is out
  560. assert_queue_len!(&daemon, 2);
  561. assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
  562. let t3 = assert_ticket_pending!(t3);
  563. let t4 = assert_ticket_pending!(t4);
  564. // New vote for t3, but not for t4.
  565. beat_ticker0.tick(beat0);
  566. // Stale beat for t3 and t4.
  567. beat_ticker1.tick(beat1_stale);
  568. // Ancient beat
  569. beat_ticker2.tick(beat2_ancient);
  570. assert_queue_len!(&daemon, 2);
  571. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  572. // t3 is out
  573. assert_queue_len!(&daemon, 1);
  574. assert_ticket_ready!(t3, VerifyAuthorityResult::Success(COMMIT_INDEX));
  575. let t4 = assert_ticket_pending!(t4);
  576. // Many new votes for t4.
  577. beat_ticker1.tick(beat_ticker1.next_beat());
  578. beat_ticker2.tick(beat_ticker2.next_beat());
  579. beat_ticker3.tick(beat_ticker3.next_beat());
  580. beat_ticker4.tick(beat_ticker4.next_beat());
  581. assert_queue_len!(&daemon, 1);
  582. // Continue clearing the queue even if we are at a new term.
  583. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  584. assert_queue_len!(&daemon, 0);
  585. assert_ticket_ready!(t4, VerifyAuthorityResult::Success(COMMIT_INDEX));
  586. }
  587. #[test]
  588. fn test_clear_ticked_requests_no_sentinel() {
  589. let daemon = init_daemon();
  590. daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 1;
  591. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  592. daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
  593. daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
  594. assert_queue_len!(&daemon, 1);
  595. // Note: sentinel is not committed.
  596. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  597. assert_queue_len!(&daemon, 1);
  598. assert_ticket_pending!(t0);
  599. }
  600. #[test]
  601. fn test_clear_ticked_requests_lost_leadership() {
  602. let daemon = init_daemon();
  603. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  604. daemon.beat_tickers[3].tick(daemon.beat_tickers[3].next_beat());
  605. daemon.beat_tickers[4].tick(daemon.beat_tickers[4].next_beat());
  606. assert_queue_len!(&daemon, 1);
  607. // Note: this is at the next term.
  608. daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
  609. assert_queue_len!(&daemon, 0);
  610. assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
  611. }
  612. #[test]
  613. fn test_clear_ticked_requests_cleared_by_others() {
  614. let daemon = init_daemon();
  615. let _t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  616. let _t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  617. let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  618. assert_queue_len!(&daemon, 3);
  619. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  620. assert_queue_len!(&daemon, 3);
  621. {
  622. let mut state = daemon.state.lock();
  623. state.start = QueueIndex(2);
  624. state.queue.pop_front().expect("Queue should not be empty");
  625. state.queue.pop_front().expect("Queue should not be empty");
  626. }
  627. daemon.beat_tickers[0].tick(daemon.beat_tickers[0].next_beat());
  628. daemon.beat_tickers[1].tick(daemon.beat_tickers[1].next_beat());
  629. assert_queue_len!(&daemon, 1);
  630. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  631. assert_queue_len!(&daemon, 0);
  632. assert_ticket_ready!(t2, VerifyAuthorityResult::Success(COMMIT_INDEX));
  633. }
  634. #[test]
  635. fn test_remove_expired_requests() {
  636. let daemon = init_daemon();
  637. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  638. let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 2);
  639. let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
  640. let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 2);
  641. let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
  642. // Override rough_time to test correctness.
  643. let now = Instant::now();
  644. {
  645. let mut state = daemon.state.lock();
  646. assert_eq!(5, state.queue.len());
  647. state.queue[0].rough_time = now - Duration::from_millis(1000);
  648. state.queue[1].rough_time = now - Duration::from_millis(500);
  649. state.queue[2].rough_time = now - Duration::from_millis(10);
  650. state.queue[3].rough_time = now - Duration::from_millis(1000);
  651. state.queue[4].rough_time = now;
  652. }
  653. // Run one iteration: no new commit, no new tick, for last term.
  654. daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX);
  655. // Tokens should stay as-is.
  656. assert_queue_len!(&daemon, 5);
  657. // Run one iteration: no new commit, no new tick, for this term.
  658. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  659. // Tokens should stay as-is.
  660. assert_queue_len!(&daemon, 5);
  661. // Run one iteration: no new commit, no new tick, for next term.
  662. daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
  663. assert_queue_len!(&daemon, 3);
  664. let queue = &daemon.state.lock().queue;
  665. assert_eq!(queue[0].rough_time, now - Duration::from_millis(10));
  666. // The token actually expired, but we should not remove it since it is
  667. // not at the beginning of the queue.
  668. assert_eq!(queue[1].rough_time, now - Duration::from_millis(1000));
  669. assert_eq!(queue[2].rough_time, now);
  670. assert_ticket_ready!(t0, VerifyAuthorityResult::TimedOut);
  671. assert_ticket_ready!(t1, VerifyAuthorityResult::TimedOut);
  672. assert_ticket_pending!(t2);
  673. assert_ticket_pending!(t3);
  674. assert_ticket_pending!(t4);
  675. }
  676. #[test]
  677. fn test_run_verify_authority_iteration() {
  678. let daemon = init_daemon();
  679. // Run of last term.
  680. daemon.reset_state(PAST_TERM, COMMIT_INDEX - 1);
  681. let _t0 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX - 2);
  682. let _t1 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX - 1);
  683. let _t2 = daemon.verify_authority_async(PAST_TERM, COMMIT_INDEX);
  684. daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX - 1);
  685. // Run of current term.
  686. daemon.reset_state(TERM, COMMIT_INDEX);
  687. let beat_ticker0 = daemon.beat_tickers[0].clone();
  688. let beat_ticker1 = daemon.beat_tickers[1].clone();
  689. let beat_ticker2 = daemon.beat_tickers[2].clone();
  690. let beat_ticker3 = daemon.beat_tickers[3].clone();
  691. let beat_ticker4 = daemon.beat_tickers[4].clone();
  692. // New request t0.
  693. let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 1);
  694. // t0 has two votes.
  695. beat_ticker0.tick(beat_ticker0.next_beat());
  696. beat_ticker1.tick(beat_ticker1.next_beat());
  697. assert_queue_len!(&daemon, 1);
  698. // Do nothing since sentinel is not committed yet.
  699. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX - 1);
  700. assert_queue_len!(&daemon, 1);
  701. // New request t1.
  702. let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
  703. // t1 has one vote.
  704. beat_ticker1.tick(beat_ticker1.next_beat());
  705. assert_queue_len!(&daemon, 2);
  706. // Clear t0 but not t1.
  707. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  708. assert_queue_len!(&daemon, 1);
  709. // Cleared by the committed sentinel.
  710. assert_ticket_ready!(
  711. t0,
  712. VerifyAuthorityResult::Success(COMMIT_INDEX - 1)
  713. );
  714. // New requests t2 and t3.
  715. let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
  716. // t1 has two notes, t2 has one.
  717. beat_ticker2.tick(beat_ticker2.next_beat());
  718. let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
  719. // t1 has three votes, t2 has two, t3 has one.
  720. beat_ticker3.tick(beat_ticker3.next_beat());
  721. assert_queue_len!(&daemon, 3);
  722. // Clear t1 and t2 because they are ticked.
  723. daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
  724. assert_queue_len!(&daemon, 1);
  725. // Note t0 and t1 have different commit indexes.
  726. assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
  727. assert_ticket_ready!(
  728. t2,
  729. VerifyAuthorityResult::Success(COMMIT_INDEX + 1)
  730. );
  731. // New request.
  732. let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
  733. // t3 has two votes, t4 has one.
  734. beat_ticker4.tick(beat_ticker4.next_beat());
  735. // Make t3 and t4 expire.
  736. {
  737. let mut state = daemon.state.lock();
  738. let t3 =
  739. state.queue.front_mut().expect("Queue should not be empty");
  740. t3.rough_time = Instant::now() - Duration::from_secs(1);
  741. let t4 = state.queue.back_mut().expect("Queue should not be empty");
  742. t4.rough_time = Instant::now() - Duration::from_secs(1);
  743. }
  744. // Run for the next term.
  745. daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 2;
  746. daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX + 2);
  747. assert_queue_len!(&daemon, 0);
  748. assert_ticket_ready!(
  749. t3,
  750. VerifyAuthorityResult::Success(COMMIT_INDEX + 1)
  751. );
  752. assert_ticket_ready!(t4, VerifyAuthorityResult::TimedOut);
  753. }
  754. #[test]
  755. fn test_edge_case_stale_sentinel() {
  756. let daemon = init_daemon();
  757. // We were the leader at an earlier term.
  758. let stale_commit_index = COMMIT_INDEX;
  759. let _stale_sentinel_commit_index = COMMIT_INDEX;
  760. // Then we lost leadership. Someone became the leader and created new
  761. // entries. Those entries are committed, but we did not know.
  762. // So our commit index is not moved.
  763. let prev_term_log_index = COMMIT_INDEX + 2;
  764. // However, the new leader had answer queries at _prev_term_log_index.
  765. // We created a new sentinel, it is not yet committed.
  766. let sentinel_commit_index = COMMIT_INDEX + 3;
  767. // New term, we are the leader.
  768. daemon.reset_state(NEXT_TERM, sentinel_commit_index);
  769. let t = daemon.verify_authority_async(NEXT_TERM, COMMIT_INDEX);
  770. // We received 3 heartbeats.
  771. let beat_ticker0 = daemon.beat_tickers[0].clone();
  772. let beat_ticker1 = daemon.beat_tickers[1].clone();
  773. let beat_ticker2 = daemon.beat_tickers[2].clone();
  774. beat_ticker0.tick(beat_ticker0.next_beat());
  775. beat_ticker1.tick(beat_ticker1.next_beat());
  776. beat_ticker2.tick(beat_ticker2.next_beat());
  777. // We are now using stale data from the old term.
  778. daemon.run_verify_authority_iteration(TERM, stale_commit_index);
  779. let t = assert_ticket_pending!(t);
  780. // We are now using data from the new term.
  781. daemon.run_verify_authority_iteration(NEXT_TERM, sentinel_commit_index);
  782. assert_ticket_ready!(
  783. t,
  784. VerifyAuthorityResult::Success(prev_term_log_index)
  785. );
  786. }
  787. #[test]
  788. fn test_edge_case_stale_commit_index() {
  789. let daemon = init_daemon();
  790. // The previous leader created two new entries after COMMIT_INDEX. These
  791. // entries are committed, but we did not know. So our commit index is
  792. // not moved. However, the new leader had answer queries at
  793. // COMMIT_INDEX + 2.
  794. let prev_term_log_index = COMMIT_INDEX + 2;
  795. // We created a new sentinel, it is not yet committed.
  796. let sentinel_commit_index = COMMIT_INDEX + 3;
  797. // New term, we are the leader.
  798. daemon.reset_state(TERM, sentinel_commit_index);
  799. // Request `t` arrived.
  800. let stale_commit_index_for_t = COMMIT_INDEX;
  801. // The daemon is triggered.
  802. let stale_commit_index_for_daemon = sentinel_commit_index;
  803. // Request `_` arrived.
  804. let commit_index = sentinel_commit_index + 1;
  805. // This is a tricky order-of-order enqueue.
  806. let _ = daemon.verify_authority_async(TERM, commit_index);
  807. let t = daemon.verify_authority_async(TERM, stale_commit_index_for_t);
  808. // We received 3 heartbeats.
  809. let beat_ticker0 = daemon.beat_tickers[0].clone();
  810. let beat_ticker1 = daemon.beat_tickers[1].clone();
  811. let beat_ticker2 = daemon.beat_tickers[2].clone();
  812. beat_ticker0.tick(beat_ticker0.next_beat());
  813. beat_ticker1.tick(beat_ticker1.next_beat());
  814. beat_ticker2.tick(beat_ticker2.next_beat());
  815. // We are now using stale data from the new term.
  816. daemon.run_verify_authority_iteration(
  817. TERM,
  818. stale_commit_index_for_daemon,
  819. );
  820. assert_ticket_ready!(
  821. t,
  822. VerifyAuthorityResult::Success(prev_term_log_index)
  823. );
  824. }
  825. }