verify_authority.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. use crate::beat_ticker::{Beat, SharedBeatTicker};
  2. use crate::daemon_env::Daemon;
  3. use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
  4. use crossbeam_utils::sync::{Parker, Unparker};
  5. use parking_lot::Mutex;
  6. use std::collections::VecDeque;
  7. use std::future::Future;
  8. use std::sync::atomic::Ordering;
  9. use std::sync::Arc;
  10. use std::time::{Duration, Instant};
  11. /// The result returned to a verify authority request.
  12. /// This request is not directly exposed to end users. Instead it is used
  13. /// internally to implement no-commit read-only requests.
  14. pub enum VerifyAuthorityResult {
  15. Success(Index),
  16. TermElapsed,
  17. TimedOut,
  18. }
  19. /// Token stored in the internal queue for authority verification. Each token
  20. /// represents one verification request.
  21. struct VerifyAuthorityToken {
  22. commit_index: Index,
  23. beats_moment: Vec<Beat>,
  24. rough_time: Instant,
  25. sender: tokio::sync::oneshot::Sender<VerifyAuthorityResult>,
  26. }
  27. #[derive(Clone, Copy, Default, Eq, Ord, PartialOrd, PartialEq)]
  28. struct QueueIndex(usize);
  29. /// The state of this daemon, should bee protected by a mutex.
  30. struct VerifyAuthorityState {
  31. /// The current term. Might be behind the real term in the cluster.
  32. term: Term,
  33. /// Pending requests to verify authority.
  34. queue: VecDeque<VerifyAuthorityToken>,
  35. /// Number of requests that have been processed.
  36. start: QueueIndex,
  37. /// A vector of queue indexes. Each element in this vector indicates the
  38. /// index of the first request that has not been confirmed by the
  39. /// corresponding peer.
  40. /// These indexes include all processed requests. They will never go down.
  41. covered: Vec<QueueIndex>,
  42. }
  43. impl VerifyAuthorityState {
  44. pub fn create(peer_count: usize) -> Self {
  45. VerifyAuthorityState {
  46. term: Term(0),
  47. queue: Default::default(),
  48. start: QueueIndex(0),
  49. covered: vec![QueueIndex(0); peer_count],
  50. }
  51. }
  52. pub fn reset(&mut self, term: Term) {
  53. self.clear_tickets();
  54. self.term = term;
  55. self.start = QueueIndex(0);
  56. for item in self.covered.iter_mut() {
  57. *item = QueueIndex(0)
  58. }
  59. }
  60. pub fn clear_tickets(&mut self) {
  61. for token in self.queue.drain(..) {
  62. let _ = token.sender.send(VerifyAuthorityResult::TermElapsed);
  63. }
  64. }
  65. }
  66. #[derive(Clone)]
  67. pub(crate) struct VerifyAuthorityDaemon {
  68. state: Arc<Mutex<VerifyAuthorityState>>,
  69. beat_tickers: Vec<SharedBeatTicker>,
  70. unparker: Option<Unparker>,
  71. }
  72. impl VerifyAuthorityDaemon {
  73. pub fn create(peer_count: usize) -> Self {
  74. Self {
  75. state: Arc::new(Mutex::new(VerifyAuthorityState::create(
  76. peer_count,
  77. ))),
  78. beat_tickers: (0..peer_count)
  79. .map(|_| SharedBeatTicker::create())
  80. .collect(),
  81. unparker: None,
  82. }
  83. }
  84. pub fn reset_state(&self, term: Term) {
  85. self.state.lock().reset(term);
  86. // Increase all beats by one to make sure upcoming verify authority
  87. // requests wait for beats in the current term. This in fact creates
  88. // phantom beats that will never be marked as completed by themselves.
  89. // They will be automatically `ticked()` when newer (real) beats are
  90. // created, sent and `ticked()`.
  91. for beat_ticker in self.beat_tickers.iter() {
  92. beat_ticker.next_beat();
  93. }
  94. }
  95. /// Enqueues a verify authority request. Returns a receiver of the
  96. /// verification result. Returns None if the term has passed.
  97. pub fn verify_authority_async(
  98. &self,
  99. current_term: Term,
  100. commit_index: Index,
  101. ) -> Option<tokio::sync::oneshot::Receiver<VerifyAuthorityResult>> {
  102. // The inflight beats are sent at least for `current_term`. This is
  103. // guaranteed by the fact that we immediately increase beats for all
  104. // peers after being elected, before releasing the "elected" message to
  105. // the rest of the Raft system. The newest beats we get here are at
  106. // least as new as the phantom beats created by `Self::reset_state()`.
  107. let beats_moment = self
  108. .beat_tickers
  109. .iter()
  110. .map(|beat_ticker| beat_ticker.current_beat())
  111. .collect();
  112. // The inflight beats could also be for any term after `current_term`.
  113. // We must check if the term stored in the daemon is the same as
  114. // `current_term`.
  115. // `state.term` could be smaller than `current_term`, if a new term is
  116. // started by someone else and we lost leadership.
  117. // `state.term` could be greater than `current_term`, if we lost
  118. // leadership but are elected leader again in a following term.
  119. // In both cases, we cannot confirm the leadership at `current_term`.
  120. let mut state = self.state.lock();
  121. if state.term != current_term {
  122. return None;
  123. }
  124. let (sender, receiver) = tokio::sync::oneshot::channel();
  125. let token = VerifyAuthorityToken {
  126. commit_index,
  127. beats_moment,
  128. rough_time: Instant::now(),
  129. sender,
  130. };
  131. state.queue.push_back(token);
  132. Some(receiver)
  133. }
  134. /// Run one iteration of the verify authority daemon.
  135. pub fn run_verify_authority_iteration(
  136. &self,
  137. current_term: Term,
  138. commit_index: Index,
  139. sentinel_commit_index: Index,
  140. ) {
  141. // Opportunistic check: do nothing if we don't have any requests.
  142. if self.state.lock().queue.is_empty() {
  143. return;
  144. }
  145. self.clear_committed_requests(current_term, commit_index);
  146. // Do not use ticks to clear requests if we have not committed at least
  147. // one log entry since the start of the term. At the start of the term,
  148. // the leader might not know the commit index of the previous leader.
  149. // This holds true even it is guaranteed that all entries committed by
  150. // the previous leader will be committed by the current leader.
  151. if commit_index >= sentinel_commit_index {
  152. self.clear_ticked_requests();
  153. }
  154. self.removed_expired_requests(current_term);
  155. }
  156. /// Clears all requests that have seen at least one commit.
  157. /// This function handles the following scenario: a verify authority request
  158. /// was received, when the `commit_index` was at C. Later as the leader we
  159. /// moved the commit index to at least C+1. That implies that when the
  160. /// request was first received, no other new commits after C could have been
  161. /// added to the log, either by this replica or others. It then follows that
  162. /// we can claim we had authority at that point.
  163. fn clear_committed_requests(
  164. &self,
  165. current_term: Term,
  166. commit_index: Index,
  167. ) {
  168. let mut state = self.state.lock();
  169. if current_term != state.term {
  170. return;
  171. }
  172. // Note the commit_index in the queue might not be in increasing order.
  173. // We could still have requests that have a smaller commit_index after
  174. // this sweep. That is an acceptable tradeoff we are taking.
  175. while let Some(head) = state.queue.pop_front() {
  176. if head.commit_index >= commit_index {
  177. state.queue.push_front(head);
  178. break;
  179. }
  180. let _ = head
  181. .sender
  182. .send(VerifyAuthorityResult::Success(head.commit_index));
  183. state.start.0 += 1;
  184. }
  185. }
  186. /// Fetches the newest successful RPC response from peers, and mark verify
  187. /// authority requests as complete if they are covered by more than half of
  188. /// the replicas.
  189. fn clear_ticked_requests(&self) {
  190. for (peer_index, beat_ticker) in self.beat_tickers.iter().enumerate() {
  191. // Fetches the newest successful RPC response from the current peer.
  192. let ticked = beat_ticker.ticked();
  193. let mut state = self.state.lock();
  194. // Update progress with `ticked`. All requests that came before
  195. // `ticked` now have one more votes of leader authority from the
  196. // current peer.
  197. let first_not_ticked_index = state.queue.partition_point(|token| {
  198. token.beats_moment[peer_index] <= ticked
  199. });
  200. assert!(first_not_ticked_index >= state.covered[peer_index].0);
  201. state.covered[peer_index].0 = first_not_ticked_index;
  202. // Count the requests that has more than N / 2 votes. We always have
  203. // the vote from ourselves, but the value is 0 in `covered` array.
  204. let mut sorted_covered = state.covered.to_owned();
  205. sorted_covered.sort_unstable();
  206. let mid = sorted_covered.len() / 2 + 1;
  207. let new_start = sorted_covered[mid];
  208. // All requests before `new_start` is now verified.
  209. let verified = new_start.0 - state.start.0;
  210. for token in state.queue.drain(..verified) {
  211. for (index, beat) in token.beats_moment.iter().enumerate() {
  212. assert!(self.beat_tickers[index].ticked() >= *beat);
  213. }
  214. let _ = token
  215. .sender
  216. .send(VerifyAuthorityResult::Success(token.commit_index));
  217. }
  218. // Move the queue starting point.
  219. state.start = new_start;
  220. }
  221. }
  222. const VERIFY_AUTHORITY_REQUEST_EXPIRATION: Duration =
  223. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 2);
  224. /// Remove expired requests if we are no longer the leader.
  225. /// If we have lost leadership, we are unlikely to receive confirmations
  226. /// of past leadership state from peers. Requests are expired after two
  227. /// heartbeat period have passed. We do not immediately cancel all incoming
  228. /// requests, in hope that we could still answer them accurately without
  229. /// breaking the consistency guarantee.
  230. fn removed_expired_requests(&self, current_term: Term) {
  231. let mut state = self.state.lock();
  232. // Return if we are still the leader, or we become the leader again.
  233. //
  234. // Note that we do not hold the main raft state lock, thus the value of
  235. // `current_term` might not be up-to-date. We only update `state.term`
  236. // after an election. If in a term after `current_term`, we are elected
  237. // leader again, `state.term` could be updated and thus greater than the
  238. // (now stale) `current_term`. In that case, the queue should have been
  239. // reset. There will be no expired request to remove.
  240. if state.term >= current_term {
  241. return;
  242. }
  243. let expiring_line =
  244. Instant::now() - Self::VERIFY_AUTHORITY_REQUEST_EXPIRATION;
  245. // Assuming bounded clock skew, otherwise we will lose efficiency.
  246. let expired =
  247. |head: &VerifyAuthorityToken| head.rough_time < expiring_line;
  248. // Note rough_time might not be in increasing order, so we might still
  249. // have requests that are expired in the queue after the sweep.
  250. while state.queue.front().map_or(false, expired) {
  251. state
  252. .queue
  253. .pop_front()
  254. .map(|head| head.sender.send(VerifyAuthorityResult::TimedOut));
  255. state.start.0 += 1;
  256. }
  257. }
  258. pub fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
  259. self.beat_tickers[peer_index].clone()
  260. }
  261. pub fn kill(&self) {
  262. if let Some(unparker) = self.unparker.as_ref() {
  263. unparker.unpark();
  264. }
  265. }
  266. }
  267. impl<Command: 'static + Send> Raft<Command> {
  268. const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
  269. /// Create a thread and runs the verify authority daemon.
  270. pub(crate) fn run_verify_authority_daemon(&mut self) {
  271. let parker = Parker::new();
  272. let unparker = parker.unparker().clone();
  273. self.verify_authority_daemon.unparker.replace(unparker);
  274. let me = self.me.clone();
  275. let keep_running = self.keep_running.clone();
  276. let daemon_env = self.daemon_env.clone();
  277. let this_daemon = self.verify_authority_daemon.clone();
  278. let rf = self.inner_state.clone();
  279. let stop_wait_group = self.stop_wait_group.clone();
  280. let join_handle = std::thread::spawn(move || {
  281. // Note: do not change this to `let _ = ...`.
  282. let _guard = daemon_env.for_scope();
  283. log::info!("{:?} verify authority daemon running ...", me);
  284. while keep_running.load(Ordering::Acquire) {
  285. parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
  286. let (current_term, commit_index, sentinel) = {
  287. let rf = rf.lock();
  288. (rf.current_term, rf.commit_index, rf.sentinel_commit_index)
  289. };
  290. this_daemon.run_verify_authority_iteration(
  291. current_term,
  292. commit_index,
  293. sentinel,
  294. );
  295. }
  296. log::info!("{:?} verify authority daemon done.", me);
  297. drop(stop_wait_group);
  298. });
  299. self.daemon_env
  300. .watch_daemon(Daemon::VerifyAuthority, join_handle);
  301. }
  302. /// Create a verify authority request. Returns None if we are not the
  303. /// leader.
  304. ///
  305. /// A successful verification allows the application to respond to read-only
  306. /// requests that arrived before this function is called. The answer must
  307. /// include all commands at or before a certain index, which is returned to
  308. /// the application with the successful verification result. The index is
  309. /// in fact the commit index at the moment this function was called. It is
  310. /// guaranteed that no other commands could possibly have been committed at
  311. /// the moment this function was called.
  312. ///
  313. /// The application is also free to include any subsequent commits in the
  314. /// response. Consistency is still guaranteed, because Raft never rolls back
  315. /// committed commands.
  316. pub fn verify_authority_async(
  317. &self,
  318. ) -> Option<impl Future<Output = crate::VerifyAuthorityResult>> {
  319. let (term, commit_index) = {
  320. let rf = self.inner_state.lock();
  321. if !rf.is_leader() {
  322. return None;
  323. }
  324. (rf.current_term, rf.commit_index)
  325. };
  326. let receiver = self
  327. .verify_authority_daemon
  328. .verify_authority_async(term, commit_index);
  329. self.heartbeats_daemon.trigger();
  330. receiver.map(|receiver| async move {
  331. receiver
  332. .await
  333. .expect("Verify authority daemon never drops senders")
  334. })
  335. }
  336. pub(crate) fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
  337. self.verify_authority_daemon.beat_ticker(peer_index)
  338. }
  339. }