verify_authority.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. use crate::beat_ticker::{Beat, SharedBeatTicker};
  2. use crate::daemon_env::Daemon;
  3. use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
  4. use parking_lot::{Condvar, Mutex};
  5. use std::collections::VecDeque;
  6. use std::future::Future;
  7. use std::sync::atomic::Ordering;
  8. use std::sync::Arc;
  9. use std::time::{Duration, Instant};
  10. /// The result returned to a verify authority request.
  11. /// This request is not directly exposed to end users. Instead it is used
  12. /// internally to implement no-commit read-only requests.
  13. #[derive(Debug)]
  14. pub enum VerifyAuthorityResult {
  15. Success(Index),
  16. TermElapsed,
  17. TimedOut,
  18. }
  19. /// Token stored in the internal queue for authority verification. Each token
  20. /// represents one verification request.
  21. #[derive(Debug)]
  22. struct VerifyAuthorityToken {
  23. commit_index: Index,
  24. beats_moment: Vec<Beat>,
  25. rough_time: Instant,
  26. sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
  27. }
  28. #[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialOrd, PartialEq)]
  29. struct QueueIndex(usize);
  30. /// The state of this daemon, should bee protected by a mutex.
  31. struct VerifyAuthorityState {
  32. /// The current term. Might be behind the real term in the cluster.
  33. term: Term,
  34. /// Pending requests to verify authority.
  35. queue: VecDeque<VerifyAuthorityToken>,
  36. /// Number of requests that have been processed.
  37. start: QueueIndex,
  38. /// A vector of queue indexes. Each element in this vector indicates the
  39. /// index of the first request that has not been confirmed by the
  40. /// corresponding peer.
  41. /// These indexes include all processed requests. They will never go down.
  42. covered: Vec<QueueIndex>,
  43. }
  44. impl VerifyAuthorityState {
  45. pub fn create(peer_count: usize) -> Self {
  46. VerifyAuthorityState {
  47. term: Term(0),
  48. queue: Default::default(),
  49. start: QueueIndex(0),
  50. covered: vec![QueueIndex(0); peer_count],
  51. }
  52. }
  53. pub fn reset(&mut self, term: Term) {
  54. self.clear_tickets();
  55. self.term = term;
  56. self.start = QueueIndex(0);
  57. for item in self.covered.iter_mut() {
  58. *item = QueueIndex(0)
  59. }
  60. }
  61. pub fn clear_tickets(&mut self) {
  62. for token in self.queue.drain(..) {
  63. let _ = token.sender.send(VerifyAuthorityResult::TermElapsed);
  64. }
  65. }
  66. }
  67. #[derive(Clone)]
  68. pub(crate) struct DaemonBeatTicker {
  69. beat_ticker: SharedBeatTicker,
  70. condvar: Arc<Condvar>,
  71. }
  72. impl DaemonBeatTicker {
  73. pub fn next_beat(&self) -> Beat {
  74. let beat = self.beat_ticker.next_beat();
  75. beat
  76. }
  77. pub fn tick(&self, beat: Beat) {
  78. self.beat_ticker.tick(beat);
  79. self.condvar.notify_one();
  80. }
  81. }
  82. #[derive(Clone)]
  83. pub(crate) struct VerifyAuthorityDaemon {
  84. state: Arc<Mutex<VerifyAuthorityState>>,
  85. beat_tickers: Vec<SharedBeatTicker>,
  86. condvar: Arc<Condvar>,
  87. }
  88. impl VerifyAuthorityDaemon {
  89. pub fn create(peer_count: usize) -> Self {
  90. Self {
  91. state: Arc::new(Mutex::new(VerifyAuthorityState::create(
  92. peer_count,
  93. ))),
  94. beat_tickers: (0..peer_count)
  95. .map(|_| SharedBeatTicker::create())
  96. .collect(),
  97. condvar: Arc::new(Condvar::new()),
  98. }
  99. }
  100. pub fn wait_for(&self, timeout: Duration) {
  101. let mut guard = self.state.lock();
  102. self.condvar.wait_for(&mut guard, timeout);
  103. }
  104. pub fn reset_state(&self, term: Term) {
  105. self.state.lock().reset(term);
  106. // Increase all beats by one to make sure upcoming verify authority
  107. // requests wait for beats in the current term. This in fact creates
  108. // phantom beats that will never be marked as completed by themselves.
  109. // They will be automatically `ticked()` when newer (real) beats are
  110. // created, sent and `ticked()`.
  111. for beat_ticker in self.beat_tickers.iter() {
  112. beat_ticker.next_beat();
  113. }
  114. }
  115. /// Enqueues a verify authority request. Returns a receiver of the
  116. /// verification result. Returns None if the term has passed.
  117. pub fn verify_authority_async(
  118. &self,
  119. current_term: Term,
  120. commit_index: Index,
  121. ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
  122. let mut state = self.state.lock();
  123. // The inflight beats are sent at least for `current_term`. This is
  124. // guaranteed by the fact that we immediately increase beats for all
  125. // peers after being elected, before releasing the "elected" message to
  126. // the rest of the Raft system. The newest beats we get here are at
  127. // least as new as the phantom beats created by `Self::reset_state()`.
  128. let beats_moment = self
  129. .beat_tickers
  130. .iter()
  131. .map(|beat_ticker| beat_ticker.current_beat())
  132. .collect();
  133. // The inflight beats could also be for any term after `current_term`.
  134. // We must check if the term stored in the daemon is the same as
  135. // `current_term`.
  136. // `state.term` could be smaller than `current_term`, if a new term is
  137. // started by someone else and we lost leadership.
  138. // `state.term` could be greater than `current_term`, if we lost
  139. // leadership but are elected leader again in a following term.
  140. // In both cases, we cannot confirm the leadership at `current_term`.
  141. if state.term != current_term {
  142. return None;
  143. }
  144. let (sender, receiver) = tokio::sync::oneshot::channel();
  145. let token = VerifyAuthorityToken {
  146. commit_index,
  147. beats_moment,
  148. rough_time: Instant::now(),
  149. sender,
  150. };
  151. state.queue.push_back(token);
  152. Some(receiver)
  153. }
  154. /// Run one iteration of the verify authority daemon.
  155. pub fn run_verify_authority_iteration(
  156. &self,
  157. current_term: Term,
  158. commit_index: Index,
  159. sentinel_commit_index: Index,
  160. ) {
  161. // Opportunistic check: do nothing if we don't have any requests.
  162. if self.state.lock().queue.is_empty() {
  163. return;
  164. }
  165. self.clear_committed_requests(current_term, commit_index);
  166. // Do not use ticks to clear requests if we have not committed at least
  167. // one log entry since the start of the term. At the start of the term,
  168. // the leader might not know the commit index of the previous leader.
  169. // This holds true even it is guaranteed that all entries committed by
  170. // the previous leader will be committed by the current leader.
  171. if commit_index >= sentinel_commit_index {
  172. self.clear_ticked_requests();
  173. }
  174. self.removed_expired_requests(current_term);
  175. }
  176. /// Clears all requests that have seen at least one commit.
  177. /// This function handles the following scenario: a verify authority request
  178. /// was received, when the `commit_index` was at C. Later as the leader we
  179. /// moved the commit index to at least C+1. That implies that when the
  180. /// request was first received, no other new commits after C could have been
  181. /// added to the log, either by this replica or others. It then follows that
  182. /// we can claim we had authority at that point.
  183. fn clear_committed_requests(
  184. &self,
  185. current_term: Term,
  186. commit_index: Index,
  187. ) {
  188. let mut state = self.state.lock();
  189. // We might skip some requests that could have been cleared, if we did
  190. // not react to the commit notification fast enough, and missed a
  191. // commit. This is about the case where in the last iteration
  192. // `commit_index` was `ci`, but in this iteration it becomes `ci + 2`
  193. // (or even larger), skipping `ci + 1`.
  194. //
  195. // Obviously skipping a commit is a problem if `ci + 2` and `ci + 1` are
  196. // both committed by us in this term. The requests that are cleared by
  197. // `+1` will be cleared by `+2` anyway. Similarly it is not a problem if
  198. // neither are committed by us in this term, since `+1` will not clear
  199. // any requests.
  200. //
  201. // If `+2` is not committed by us, but `+1` is, we lose the opportunity
  202. // to use `+1` to clear requests. The chances of losing this opportunity
  203. // are slim, because between `+1` and `+2`, there has to be a missed
  204. // heartbeat interval, and a new commit (`+2`) from another leader. We
  205. // have plenty of time to run this method before `+2` reaches us.
  206. //
  207. // Overall it is acceptable to simplify the implementation and risk
  208. // losing the mentioned opportunity.
  209. if current_term != state.term {
  210. return;
  211. }
  212. // Note the commit_index in the queue might not be in increasing order.
  213. // We could still have requests that have a smaller commit_index after
  214. // this sweep. That is an acceptable tradeoff we are taking.
  215. while let Some(head) = state.queue.pop_front() {
  216. if head.commit_index >= commit_index {
  217. state.queue.push_front(head);
  218. break;
  219. }
  220. // At the start of the term, the previous leader might have exposed
  221. // all entries before the sentinel commit to clients. If a request
  222. // arrived before the sentinel commit is committed, its commit index
  223. // (token.commit_index) might be inaccurate. Thus we cannot allow
  224. // the client to return any state before the sentinel index.
  225. //
  226. // We did not choose the sentinel index but opted for a more strict
  227. // commit index, because the index is committed anyway. It should be
  228. // delivered to the application really quickly. We paid the price
  229. // with latency but made the request more fresh.
  230. let _ = head
  231. .sender
  232. .send(VerifyAuthorityResult::Success(commit_index));
  233. state.start.0 += 1;
  234. }
  235. }
  236. /// Fetches the newest successful RPC response from peers, and mark verify
  237. /// authority requests as complete if they are covered by more than half of
  238. /// the replicas.
  239. fn clear_ticked_requests(&self) {
  240. for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
  241. // Fetches the newest successful RPC response from the current peer.
  242. let ticked = beat_ticker.ticked();
  243. let mut state = self.state.lock();
  244. // Update progress with `ticked`. All requests that came before
  245. // `ticked` now have one more votes of leader authority from the
  246. // current peer.
  247. let first_not_ticked_index = state.queue.partition_point(|token| {
  248. token.beats_moment[peer_index] <= ticked
  249. });
  250. let new_covered = first_not_ticked_index + state.start.0;
  251. assert!(new_covered >= state.covered[peer_index].0);
  252. state.covered[peer_index].0 = new_covered;
  253. // Count the requests that has more than N / 2 votes. We always have
  254. // the vote from ourselves, but the value is 0 in `covered` array.
  255. let mut sorted_covered = state.covered.to_owned();
  256. sorted_covered.sort_unstable();
  257. let mid = sorted_covered.len() / 2 + 1;
  258. let new_start = sorted_covered[mid];
  259. // `state.start` could have been moved by other means, e.g. by a
  260. // subsequent commit of the same term after the beat is issued.
  261. // Then the relevant verify authority requests have been processed.
  262. // If all ticked requests have been processed, nothing needs to be
  263. // done. Skip to the next iteration.
  264. if new_start <= state.start {
  265. continue;
  266. }
  267. // All requests before `new_start` is now verified.
  268. let verified = new_start.0 - state.start.0;
  269. for token in state.queue.drain(..verified) {
  270. let mut cnt = 0;
  271. for (index, beat) in token.beats_moment.iter().enumerate() {
  272. if self.beat_tickers[index].ticked() >= *beat {
  273. cnt += 1;
  274. }
  275. }
  276. assert!(cnt + cnt + 1 >= self.beat_tickers.len());
  277. let _ = token
  278. .sender
  279. .send(VerifyAuthorityResult::Success(token.commit_index));
  280. }
  281. // Move the queue starting point.
  282. state.start = new_start;
  283. }
  284. }
  285. const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
  286. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 2);
  287. /// Remove expired requests if we are no longer the leader.
  288. /// If we have lost leadership, we are unlikely to receive confirmations
  289. /// of past leadership state from peers. Requests are expired after two
  290. /// heartbeat period have passed. We do not immediately cancel all incoming
  291. /// requests, in hope that we could still answer them accurately without
  292. /// breaking the consistency guarantee.
  293. fn removed_expired_requests(&self, current_term: Term) {
  294. let mut state = self.state.lock();
  295. // Return if we are still the leader, or we become the leader again.
  296. //
  297. // Note that we do not hold the main raft state lock, thus the value of
  298. // `current_term` might not be up-to-date. We only update `state.term`
  299. // after an election. If in a term after `current_term`, we are elected
  300. // leader again, `state.term` could be updated and thus greater than the
  301. // (now stale) `current_term`. In that case, the queue should have been
  302. // reset. There will be no expired request to remove.
  303. if state.term >= current_term {
  304. return;
  305. }
  306. let expiring_line =
  307. Instant::now() - Self::VERIFY_AUTHORITY_REQUEST_EXPIRATION;
  308. // Assuming bounded clock skew, otherwise we will lose efficiency.
  309. let expired =
  310. |head: &VerifyAuthorityToken| head.rough_time < expiring_line;
  311. // Note rough_time might not be in increasing order, so we might still
  312. // have requests that are expired in the queue after the sweep.
  313. while state.queue.front().map_or(false, expired) {
  314. state
  315. .queue
  316. .pop_front()
  317. .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
  318. state.start.0 += 1;
  319. }
  320. }
  321. pub fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
  322. DaemonBeatTicker {
  323. beat_ticker: self.beat_tickers[peer_index].clone(),
  324. condvar: self.condvar.clone(),
  325. }
  326. }
  327. pub fn kill(&self) {
  328. let term = self.state.lock().term;
  329. // Fail all inflight verify authority requests. It is important to do
  330. // this so that the RPC framework could drop requests served by us and
  331. // release all references to the Raft instance.
  332. self.reset_state(term);
  333. self.condvar.notify_all();
  334. }
  335. }
  336. impl<Command: 'static + Send> Raft<Command> {
  337. const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
  338. /// Create a thread and runs the verify authority daemon.
  339. pub(crate) fn run_verify_authority_daemon(&self) {
  340. let me = self.me.clone();
  341. let keep_running = self.keep_running.clone();
  342. let daemon_env = self.daemon_env.clone();
  343. let this_daemon = self.verify_authority_daemon.clone();
  344. let rf = self.inner_state.clone();
  345. let stop_wait_group = self.stop_wait_group.clone();
  346. let join_handle = std::thread::spawn(move || {
  347. // Note: do not change this to `let _ = ...`.
  348. let _guard = daemon_env.for_scope();
  349. log::info!("{:?} verify authority daemon running ...", me);
  350. while keep_running.load(Ordering::Acquire) {
  351. this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
  352. let (current_term, commit_index, sentinel) = {
  353. let rf = rf.lock();
  354. (rf.current_term, rf.commit_index, rf.sentinel_commit_index)
  355. };
  356. this_daemon.run_verify_authority_iteration(
  357. current_term,
  358. commit_index,
  359. sentinel,
  360. );
  361. }
  362. log::info!("{:?} verify authority daemon done.", me);
  363. drop(stop_wait_group);
  364. });
  365. self.daemon_env
  366. .watch_daemon(Daemon::VerifyAuthority, join_handle);
  367. }
  368. /// Create a verify authority request. Returns None if we are not the
  369. /// leader.
  370. ///
  371. /// A successful verification allows the application to respond to read-only
  372. /// requests that arrived before this function is called. The answer must
  373. /// include all commands at or before a certain index, which is returned to
  374. /// the application with the successful verification result. The index is
  375. /// in fact the commit index at the moment this function was called. It is
  376. /// guaranteed that no other commands could possibly have been committed at
  377. /// the moment this function was called.
  378. ///
  379. /// The application is also free to include any subsequent commits in the
  380. /// response. Consistency is still guaranteed, because Raft never rolls back
  381. /// committed commands.
  382. pub fn verify_authority_async(
  383. &self,
  384. ) -> Option<impl Future<Output = crate::VerifyAuthorityResult>> {
  385. // Fail the request if we have been killed.
  386. if !self.keep_running.load(Ordering::Acquire) {
  387. return None;
  388. }
  389. let (term, commit_index) = {
  390. let rf = self.inner_state.lock();
  391. if !rf.is_leader() {
  392. // Returning none instead of `Pending::Ready(TermElapsed)`,
  393. // because that requires a separate struct that implements
  394. // Future, which is tedious to write.
  395. return None;
  396. }
  397. (rf.current_term, rf.commit_index)
  398. };
  399. let receiver = self
  400. .verify_authority_daemon
  401. .verify_authority_async(term, commit_index);
  402. self.heartbeats_daemon.trigger();
  403. receiver.map(|receiver| async move {
  404. receiver
  405. .await
  406. .expect("Verify authority daemon never drops senders")
  407. })
  408. }
  409. pub(crate) fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
  410. self.verify_authority_daemon.beat_ticker(peer_index)
  411. }
  412. }