election.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use parking_lot::{Condvar, Mutex};
  5. use rand::{thread_rng, Rng};
  6. use crate::remote_context::RemoteContext;
  7. use crate::sync_log_entries::SyncLogEntriesComms;
  8. use crate::utils::{retry_rpc, RPC_DEADLINE};
  9. use crate::verify_authority::VerifyAuthorityDaemon;
  10. use crate::{
  11. IndexTerm, Peer, Persister, Raft, RaftState, ReplicableCommand,
  12. RequestVoteArgs, State, Term,
  13. };
  14. struct VersionedDeadline {
  15. version: usize,
  16. deadline: Option<Instant>,
  17. }
  18. pub(crate) struct ElectionState {
  19. // Timer will be removed upon shutdown or elected.
  20. timer: Mutex<VersionedDeadline>,
  21. // Wake up the timer thread when the timer is reset or cancelled.
  22. signal: Condvar,
  23. }
  24. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 200;
  25. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 200;
  26. impl ElectionState {
  27. pub(crate) fn create() -> Self {
  28. Self {
  29. timer: Mutex::new(VersionedDeadline {
  30. version: 0,
  31. deadline: None,
  32. }),
  33. signal: Condvar::new(),
  34. }
  35. }
  36. pub(crate) fn reset_election_timer(&self) {
  37. let mut guard = self.timer.lock();
  38. guard.version += 1;
  39. guard.deadline.replace(Self::election_timeout());
  40. self.signal.notify_one();
  41. }
  42. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  43. let mut guard = self.timer.lock();
  44. if guard.version != timer_count {
  45. return false;
  46. }
  47. guard.version += 1;
  48. guard.deadline.replace(Self::election_timeout());
  49. self.signal.notify_one();
  50. true
  51. }
  52. fn election_timeout() -> Instant {
  53. Instant::now()
  54. + Duration::from_millis(
  55. ELECTION_TIMEOUT_BASE_MILLIS
  56. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  57. )
  58. }
  59. pub(crate) fn stop_election_timer(&self) {
  60. let mut guard = self.timer.lock();
  61. guard.version += 1;
  62. guard.deadline.take();
  63. self.signal.notify_one();
  64. }
  65. }
  66. /// A helper struct that stores all the information needed
  67. /// to become the leader.
  68. struct ElectionCandidate<Command> {
  69. me: Peer,
  70. peers: Vec<Peer>,
  71. rf: Arc<Mutex<RaftState<Command>>>,
  72. election: Arc<ElectionState>,
  73. new_log_entry: SyncLogEntriesComms,
  74. verify_authority_daemon: VerifyAuthorityDaemon,
  75. persister: Arc<dyn Persister>,
  76. thread_pool: tokio::runtime::Handle,
  77. }
  78. /// Result of a vote. Returns the cancellation token if it is not cancelled.
  79. enum QuorumOrCancelled {
  80. Accepted(futures_channel::oneshot::Receiver<()>),
  81. Rejected,
  82. Cancelled,
  83. }
  84. // Command must be
  85. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  86. // 1. clone: they are copied to the persister.
  87. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  88. // 3. serialize: they are converted to bytes to persist.
  89. impl<Command: ReplicableCommand> Raft<Command> {
  90. /// Runs the election timer daemon that triggers elections.
  91. ///
  92. /// The daemon holds a counter and an optional deadline in a mutex. Each
  93. /// time the timer is reset, the counter is increased by one. The deadline
  94. /// will be replaced with a randomized timeout. No other data is held.
  95. ///
  96. /// The daemon runs in a loop. In each iteration, the timer either fires, or
  97. /// is reset. At the beginning of each iteration, a new election will be
  98. /// started if
  99. /// 1. In the last iteration, the timer fired, and
  100. /// 2. Since the fired timer was set until now, the timer has not been
  101. /// reset, i.e. the counter has not been updated.
  102. ///
  103. /// If both conditions are met, an election is started. We keep a cancel
  104. /// token for the running election. Canceling is a courtesy and does not
  105. /// impact correctness. A should-have-been-cancelled election would cancel
  106. /// itself after counting enough votes ("term has changed").
  107. ///
  108. /// In each election, the first thing that happens is resetting the election
  109. /// timer. This reset and condition 2 above is tested and applied in the
  110. /// same atomic operation. Then one RPC is sent to each peer, asking for a
  111. /// vote. A task is created to wait for those RPCs to return and then count
  112. /// the votes.
  113. ///
  114. /// At the same time, the daemon locks the counter and the timeout. It
  115. /// expects the counter to increase by 1 but no more than that. If that
  116. /// expectation is not met, the daemon knows the election either did not
  117. /// happen, or the timer has been reset after the election starts. In that
  118. /// case it considers the timer not fired and skips the wait described
  119. /// below.
  120. ///
  121. /// If the expectation is met, the daemon waits util the timer fires, or
  122. /// the timer is reset, which ever happens first. If both happen when daemon
  123. /// wakes up, the reset takes precedence and the timer is considered not
  124. /// fired. The result (timer fired or is reset) is recorded so that it could
  125. /// be used in the next iteration.
  126. ///
  127. /// The daemon cancels the running election after waking up, no matter what
  128. /// happens. The iteration ends here.
  129. ///
  130. /// Before the first iteration, the timer is considered reset and not fired.
  131. ///
  132. /// The vote-counting task operates independently of the daemon. If it
  133. /// collects enough votes and the term has not yet passed, it resets the
  134. /// election timer. There could be more than one vote-counting tasks running
  135. /// at the same time, but all earlier tasks except the newest one will
  136. /// eventually realize the term they were competing for has passed and quit.
  137. pub(crate) fn run_election_timer(&self) {
  138. log::info!("{:?} election timer daemon running ...", self.me);
  139. let election = self.election.clone();
  140. let mut should_run = None;
  141. while self.keep_running.load(Ordering::Relaxed) {
  142. let mut cancel_handle = should_run.and_then(|last_timer_count| {
  143. // Election can only be run when we are **not** holding the
  144. // election timer lock. The reason is that scheduling a next
  145. // election requires the rf lock. We always acquire the rf lock
  146. // before the election timer lock in every other place. Here we
  147. // must follow the same pattern to avoid deadlock: dropping the
  148. // election timer lock then acquiring the two locks in order.
  149. self.schedule_election(last_timer_count)
  150. });
  151. let mut guard = election.timer.lock();
  152. let (timer_count, deadline) = (guard.version, guard.deadline);
  153. // If the timer is reset
  154. // 0. Zero times. We know should_run is None. If should_run has
  155. // a value, the election would have been started and the timer
  156. // reset by the election. That means the timer did not fire in
  157. // the last iteration. We should just wait.
  158. // 1. One time. We know that the timer is either reset by the
  159. // election or by someone else before the election, in which
  160. // case the election was never started. We should just wait.
  161. // 2. More than one time. We know that the timer is first reset
  162. // by the election, and then reset by someone else, in that
  163. // order. We should cancel the election and just wait.
  164. if let Some(last_timer_count) = should_run {
  165. let expected_timer_count = last_timer_count + 1;
  166. assert!(timer_count >= expected_timer_count);
  167. // If the timer was changed more than once, we know the
  168. // last scheduled election should have been cancelled.
  169. if timer_count > expected_timer_count {
  170. cancel_handle.take().map(|c| c.send(()));
  171. }
  172. }
  173. // Check the running signal before sleeping. We are holding the
  174. // timer lock, so no one can change it. The kill() method will
  175. // not be able to notify this thread before `wait` is called.
  176. if !self.keep_running.load(Ordering::Relaxed) {
  177. cancel_handle.take().map(|c| c.send(()));
  178. break;
  179. }
  180. should_run = match deadline {
  181. Some(timeout) => loop {
  182. let ret = election.signal.wait_until(&mut guard, timeout);
  183. let fired = ret.timed_out() && Instant::now() > timeout;
  184. // If the timer has been updated, do not schedule,
  185. // break so that we could cancel.
  186. if timer_count != guard.version {
  187. // Timer has been updated, cancel current
  188. // election, and block on timeout again.
  189. break None;
  190. } else if fired {
  191. // Timer has fired, remove the timer and allow
  192. // running the next election at timer_count.
  193. // If the next election is cancelled before we
  194. // are back on wait, timer_count will be set to
  195. // a different value.
  196. guard.version += 1;
  197. guard.deadline.take();
  198. break Some(guard.version);
  199. }
  200. // Alarm has not fired yet. Continue to wait.
  201. },
  202. None => {
  203. // Nothing to do. Block here until the timer is reset.
  204. // We will not block here indefinitely at shutdown, since
  205. // we checked that keep_running is still true while holding
  206. // the election lock.
  207. election.signal.wait(&mut guard);
  208. // The timeout has changed, check again.
  209. None
  210. }
  211. };
  212. drop(guard);
  213. // Whenever woken up, cancel the current running election.
  214. // There are 3 cases we could reach here
  215. // 1. We received an AppendEntries, or decided to vote for
  216. // a peer, and thus turned into a follower. In this case we'll
  217. // be notified by the election signal.
  218. // 2. We are a follower but didn't receive a heartbeat on time,
  219. // or we are a candidate but didn't not collect enough vote on
  220. // time. In this case we'll have a timeout.
  221. // 3. When become a leader, or are shutdown. In this case we'll
  222. // be notified by the election signal.
  223. cancel_handle.map(|c| c.send(()));
  224. }
  225. log::info!("{:?} election timer daemon done.", self.me);
  226. }
  227. fn schedule_election(
  228. &self,
  229. timer_count: usize,
  230. ) -> Option<futures_channel::oneshot::Sender<()>> {
  231. let (term, last_log_index) = {
  232. let mut rf = self.inner_state.lock();
  233. // The previous election is faster and reached the critical section
  234. // before us. We should stop and not run this election.
  235. // Or someone else increased the term and the timer is reset.
  236. if !self.election.try_reset_election_timer(timer_count) {
  237. return None;
  238. }
  239. rf.state = State::Prevote;
  240. (rf.current_term, rf.log.last_index_term())
  241. };
  242. let (tx, rx) = futures_channel::oneshot::channel();
  243. let candidate = ElectionCandidate {
  244. me: self.me,
  245. peers: self.peers.clone(),
  246. rf: self.inner_state.clone(),
  247. election: self.election.clone(),
  248. new_log_entry: self.sync_log_entries_comms.clone(),
  249. verify_authority_daemon: self.verify_authority_daemon.clone(),
  250. persister: self.persister.clone(),
  251. thread_pool: self.thread_pool.clone(),
  252. };
  253. self.thread_pool.spawn(Self::run_election(
  254. term,
  255. candidate,
  256. last_log_index,
  257. rx,
  258. ));
  259. Some(tx)
  260. }
  261. async fn run_election(
  262. term: Term,
  263. candidate: ElectionCandidate<Command>,
  264. last_index_term: IndexTerm,
  265. cancel_token: futures_channel::oneshot::Receiver<()>,
  266. ) {
  267. let me = candidate.me;
  268. let prevote_args = RequestVoteArgs {
  269. term,
  270. candidate_id: me,
  271. last_log_index: last_index_term.index,
  272. last_log_term: last_index_term.term,
  273. prevote: true,
  274. };
  275. // Run the prevote phase first.
  276. let prevotes = Self::spawn_request_votes(&candidate, prevote_args);
  277. let prevote_results =
  278. Self::quorum_before_cancelled(prevotes, cancel_token).await;
  279. let cancel_token = match prevote_results {
  280. QuorumOrCancelled::Accepted(cancel_token) => cancel_token,
  281. // Did not get quorum on a prevote. Skip the rest.
  282. _ => return,
  283. };
  284. // Advance to the next term.
  285. let (next_term, (last_log_index, last_log_term)) = {
  286. let mut rf = candidate.rf.lock();
  287. if term != rf.current_term || rf.state != State::Prevote {
  288. return;
  289. }
  290. rf.current_term.0 += 1;
  291. rf.voted_for = Some(me);
  292. rf.state = State::Candidate;
  293. candidate.persister.save_state(rf.persisted_state().into());
  294. (rf.current_term, rf.log.last_index_term().unpack())
  295. };
  296. let args = RequestVoteArgs {
  297. term: next_term,
  298. candidate_id: me,
  299. last_log_index,
  300. last_log_term,
  301. prevote: false,
  302. };
  303. let votes = Self::spawn_request_votes(&candidate, args);
  304. if let QuorumOrCancelled::Accepted(_) =
  305. Self::quorum_before_cancelled(votes, cancel_token).await
  306. {
  307. Self::become_leader(next_term, candidate);
  308. }
  309. }
  310. const REQUEST_VOTE_RETRY: usize = 1;
  311. async fn request_vote(peer: Peer, args: RequestVoteArgs) -> Option<bool> {
  312. let term = args.term;
  313. // See the comment in send_heartbeat() on why a reference is used.
  314. let rpc_client = RemoteContext::<Command>::rpc_client(peer);
  315. let reply =
  316. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  317. rpc_client.request_vote(args.clone())
  318. })
  319. .await;
  320. if let Ok(reply) = reply {
  321. RemoteContext::<Command>::term_marker().mark(reply.term);
  322. return Some(reply.vote_granted && reply.term == term);
  323. }
  324. None
  325. }
  326. fn spawn_request_votes(
  327. candidate: &ElectionCandidate<Command>,
  328. args: RequestVoteArgs,
  329. ) -> Vec<tokio::task::JoinHandle<Option<bool>>> {
  330. let mut votes = vec![];
  331. for peer in candidate.peers.clone().into_iter() {
  332. if peer != candidate.me {
  333. let one_vote = candidate
  334. .thread_pool
  335. .spawn(Self::request_vote(peer, args.clone()));
  336. votes.push(one_vote);
  337. }
  338. }
  339. return votes;
  340. }
  341. async fn quorum_before_cancelled(
  342. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  343. cancel_token: futures_channel::oneshot::Receiver<()>,
  344. ) -> QuorumOrCancelled {
  345. let quorum = votes.len() >> 1;
  346. let mut vote_count = 0;
  347. let mut against_count = 0;
  348. let mut cancel_token = cancel_token;
  349. let mut futures_vec = votes;
  350. while vote_count < quorum
  351. && against_count <= quorum
  352. && !futures_vec.is_empty()
  353. {
  354. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  355. let selected = futures_util::future::select(
  356. cancel_token,
  357. futures_util::future::select_all(futures_vec),
  358. )
  359. .await;
  360. let ((one_vote, _, rest), new_token) = match selected {
  361. futures_util::future::Either::Left(_) => {
  362. return QuorumOrCancelled::Cancelled
  363. }
  364. futures_util::future::Either::Right(tuple) => tuple,
  365. };
  366. futures_vec = rest;
  367. cancel_token = new_token;
  368. if let Ok(Some(vote)) = one_vote {
  369. if vote {
  370. vote_count += 1
  371. } else {
  372. against_count += 1
  373. }
  374. }
  375. }
  376. return if vote_count < quorum {
  377. QuorumOrCancelled::Rejected
  378. } else {
  379. QuorumOrCancelled::Accepted(cancel_token)
  380. };
  381. }
  382. fn become_leader(term: Term, this: ElectionCandidate<Command>) {
  383. let me = this.me;
  384. log::info!("{me:?} voted as leader at term {term:?}");
  385. let mut rf = this.rf.lock();
  386. if rf.current_term == term && rf.state == State::Candidate {
  387. // We are the leader now. The election timer can be stopped.
  388. this.election.stop_election_timer();
  389. rf.state = State::Leader;
  390. rf.leader_id = me;
  391. rf.match_index.fill(0);
  392. let sentinel_commit_index;
  393. if rf.commit_index != rf.log.last_index_term().index {
  394. // Create a sentinel commit at the start of the term.
  395. sentinel_commit_index = rf.log.add_term_change_entry(term);
  396. this.persister.save_state(rf.persisted_state().into());
  397. } else {
  398. sentinel_commit_index = rf.commit_index;
  399. }
  400. // Reset the verify authority daemon before sending heartbeats to
  401. // followers. This is critical to the correctness of verifying
  402. // authority.
  403. // No verity authority request can go through before the reset is
  404. // done, since we are holding the raft lock.
  405. this.verify_authority_daemon
  406. .reset_state(term, sentinel_commit_index);
  407. // Sync all logs now.
  408. this.new_log_entry
  409. .reset_progress(term, sentinel_commit_index);
  410. }
  411. }
  412. }