election.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use parking_lot::{Condvar, Mutex};
  5. use rand::{thread_rng, Rng};
  6. use crate::daemon_env::Daemon;
  7. use crate::term_marker::TermMarker;
  8. use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
  9. use crate::verify_authority::VerifyAuthorityDaemon;
  10. use crate::{
  11. LogEntryEnum, Peer, Persister, Raft, RaftState, RemoteRaft,
  12. RequestVoteArgs, State, Term,
  13. };
  14. #[derive(Default)]
  15. pub(crate) struct ElectionState {
  16. // Timer will be removed upon shutdown or elected.
  17. timer: Mutex<(usize, Option<Instant>)>,
  18. // Wake up the timer thread when the timer is reset or cancelled.
  19. signal: Condvar,
  20. }
  21. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 200;
  22. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 200;
  23. impl ElectionState {
  24. pub(crate) fn create() -> Self {
  25. Self {
  26. timer: Mutex::new((0, None)),
  27. signal: Condvar::new(),
  28. }
  29. }
  30. pub(crate) fn reset_election_timer(&self) {
  31. let mut guard = self.timer.lock();
  32. guard.0 += 1;
  33. guard.1.replace(Self::election_timeout());
  34. self.signal.notify_one();
  35. }
  36. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  37. let mut guard = self.timer.lock();
  38. if guard.0 != timer_count {
  39. return false;
  40. }
  41. guard.0 += 1;
  42. guard.1.replace(Self::election_timeout());
  43. self.signal.notify_one();
  44. true
  45. }
  46. fn election_timeout() -> Instant {
  47. Instant::now()
  48. + Duration::from_millis(
  49. ELECTION_TIMEOUT_BASE_MILLIS
  50. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  51. )
  52. }
  53. pub(crate) fn stop_election_timer(&self) {
  54. let mut guard = self.timer.lock();
  55. guard.0 += 1;
  56. guard.1.take();
  57. self.signal.notify_one();
  58. }
  59. }
  60. // Command must be
  61. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  62. // 1. clone: they are copied to the persister.
  63. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  64. // 3. serialize: they are converted to bytes to persist.
  65. impl<Command> Raft<Command>
  66. where
  67. Command: 'static + Clone + Default + Send + serde::Serialize,
  68. {
  69. /// Runs the election timer daemon that triggers elections.
  70. ///
  71. /// The daemon holds a counter and an optional deadline in a mutex. Each
  72. /// time the timer is reset, the counter is increased by one. The deadline
  73. /// will be replaced with a randomized timeout. No other data is held.
  74. ///
  75. /// The daemon runs in a loop. In each iteration, the timer either fires, or
  76. /// is reset. At the beginning of each iteration, a new election will be
  77. /// started if
  78. /// 1. In the last iteration, the timer fired, and
  79. /// 2. Since the fired timer was set until now, the timer has not been
  80. /// reset, i.e. the counter has not been updated.
  81. ///
  82. /// If both conditions are met, an election is started. We keep a cancel
  83. /// token for the running election. Canceling is a courtesy and does not
  84. /// impact correctness. A should-have-been-cancelled election would cancel
  85. /// itself after counting enough votes ("term has changed").
  86. ///
  87. /// In each election, the first thing that happens is resetting the election
  88. /// timer. This reset and condition 2 above is tested and applied in the
  89. /// same atomic operation. Then one RPC is sent to each peer, asking for a
  90. /// vote. A task is created to wait for those RPCs to return and then count
  91. /// the votes.
  92. ///
  93. /// At the same time, the daemon locks the counter and the timeout. It
  94. /// expects the counter to increase by 1 but no more than that. If that
  95. /// expectation is not met, the daemon knows the election either did not
  96. /// happen, or the timer has been reset after the election starts. In that
  97. /// case it considers the timer not fired and skips the wait described
  98. /// below.
  99. ///
  100. /// If the expectation is met, the daemon waits util the timer fires, or
  101. /// the timer is reset, which ever happens first. If both happen when daemon
  102. /// wakes up, the reset takes precedence and the timer is considered not
  103. /// fired. The result (timer fired or is reset) is recorded so that it could
  104. /// be used in the next iteration.
  105. ///
  106. /// The daemon cancels the running election after waking up, no matter what
  107. /// happens. The iteration ends here.
  108. ///
  109. /// Before the first iteration, the timer is considered reset and not fired.
  110. ///
  111. /// The vote-counting task operates independently of the daemon. If it
  112. /// collects enough votes and the term has not yet passed, it resets the
  113. /// election timer. There could be more than one vote-counting tasks running
  114. /// at the same time, but all earlier tasks except the newest one will
  115. /// eventually realize the term they were competing for has passed and quit.
  116. pub(crate) fn run_election_timer(&self) {
  117. let this = self.clone();
  118. let join_handle = std::thread::spawn(move || {
  119. // Note: do not change this to `let _ = ...`.
  120. let _guard = this.daemon_env.for_scope();
  121. log::info!("{:?} election timer daemon running ...", this.me);
  122. let election = this.election.clone();
  123. let mut should_run = None;
  124. while this.keep_running.load(Ordering::SeqCst) {
  125. let mut cancel_handle =
  126. should_run.and_then(|last_timer_count| {
  127. this.run_election(last_timer_count)
  128. });
  129. let mut guard = election.timer.lock();
  130. let (timer_count, deadline) = *guard;
  131. // If the timer is reset
  132. // 0. Zero times. We know should_run is None. If should_run has
  133. // a value, the election would have been started and the timer
  134. // reset by the election. That means the timer did not fire in
  135. // the last iteration. We should just wait.
  136. // 1. One time. We know that the timer is either reset by the
  137. // election or by someone else before the election, in which
  138. // case the election was never started. We should just wait.
  139. // 2. More than one time. We know that the timer is first reset
  140. // by the election, and then reset by someone else, in that
  141. // order. We should cancel the election and just wait.
  142. if let Some(last_timer_count) = should_run {
  143. let expected_timer_count = last_timer_count + 1;
  144. assert!(timer_count >= expected_timer_count);
  145. // If the timer was changed more than once, we know the
  146. // last scheduled election should have been cancelled.
  147. if timer_count > expected_timer_count {
  148. cancel_handle.take().map(|c| c.send(()));
  149. }
  150. }
  151. // check the running signal before sleeping. We are holding the
  152. // timer lock, so no one can change it. The kill() method will
  153. // not be able to notify this thread before `wait` is called.
  154. if !this.keep_running.load(Ordering::SeqCst) {
  155. break;
  156. }
  157. should_run = match deadline {
  158. Some(timeout) => loop {
  159. let ret =
  160. election.signal.wait_until(&mut guard, timeout);
  161. let fired = ret.timed_out() && Instant::now() > timeout;
  162. // If the timer has been updated, do not schedule,
  163. // break so that we could cancel.
  164. if timer_count != guard.0 {
  165. // Timer has been updated, cancel current
  166. // election, and block on timeout again.
  167. break None;
  168. } else if fired {
  169. // Timer has fired, remove the timer and allow
  170. // running the next election at timer_count.
  171. // If the next election is cancelled before we
  172. // are back on wait, timer_count will be set to
  173. // a different value.
  174. guard.0 += 1;
  175. guard.1.take();
  176. break Some(guard.0);
  177. }
  178. },
  179. None => {
  180. election.signal.wait(&mut guard);
  181. // The timeout has changed, check again.
  182. None
  183. }
  184. };
  185. drop(guard);
  186. // Whenever woken up, cancel the current running election.
  187. // There are 3 cases we could reach here
  188. // 1. We received an AppendEntries, or decided to vote for
  189. // a peer, and thus turned into a follower. In this case we'll
  190. // be notified by the election signal.
  191. // 2. We are a follower but didn't receive a heartbeat on time,
  192. // or we are a candidate but didn't not collect enough vote on
  193. // time. In this case we'll have a timeout.
  194. // 3. When become a leader, or are shutdown. In this case we'll
  195. // be notified by the election signal.
  196. cancel_handle.map(|c| c.send(()));
  197. }
  198. log::info!("{:?} election timer daemon done.", this.me);
  199. let stop_wait_group = this.stop_wait_group.clone();
  200. // Making sure the rest of `this` is dropped before the wait group.
  201. drop(this);
  202. drop(stop_wait_group);
  203. });
  204. self.daemon_env
  205. .watch_daemon(Daemon::ElectionTimer, join_handle);
  206. }
  207. fn run_election(
  208. &self,
  209. timer_count: usize,
  210. ) -> Option<futures_channel::oneshot::Sender<()>> {
  211. let me = self.me;
  212. let (term, args) = {
  213. let mut rf = self.inner_state.lock();
  214. // The previous election is faster and reached the critical section
  215. // before us. We should stop and not run this election.
  216. // Or someone else increased the term and the timer is reset.
  217. if !self.election.try_reset_election_timer(timer_count) {
  218. return None;
  219. }
  220. rf.current_term.0 += 1;
  221. rf.voted_for = Some(me);
  222. rf.state = State::Candidate;
  223. self.persister.save_state(rf.persisted_state().into());
  224. let term = rf.current_term;
  225. let (last_log_index, last_log_term) =
  226. rf.log.last_index_term().unpack();
  227. (
  228. term,
  229. RequestVoteArgs {
  230. term,
  231. candidate_id: me,
  232. last_log_index,
  233. last_log_term,
  234. },
  235. )
  236. };
  237. let mut votes = vec![];
  238. let term_marker = self.term_marker();
  239. for (index, rpc_client) in self.peers.iter().enumerate() {
  240. if index != self.me.0 {
  241. // RpcClient must be cloned so that it lives long enough for
  242. // spawn(), which requires static life time.
  243. // RPCs are started right away.
  244. let one_vote = self.thread_pool.spawn(Self::request_vote(
  245. rpc_client.clone(),
  246. args.clone(),
  247. term_marker.clone(),
  248. ));
  249. votes.push(one_vote);
  250. }
  251. }
  252. let (tx, rx) = futures_channel::oneshot::channel();
  253. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  254. me,
  255. term,
  256. self.inner_state.clone(),
  257. votes,
  258. rx,
  259. self.election.clone(),
  260. self.new_log_entry.clone().unwrap(),
  261. self.verify_authority_daemon.clone(),
  262. self.persister.clone(),
  263. ));
  264. Some(tx)
  265. }
  266. const REQUEST_VOTE_RETRY: usize = 1;
  267. async fn request_vote(
  268. rpc_client: impl RemoteRaft<Command>,
  269. args: RequestVoteArgs,
  270. term_marker: TermMarker<Command>,
  271. ) -> Option<bool> {
  272. let term = args.term;
  273. // See the comment in send_heartbeat() for this override.
  274. let rpc_client = &rpc_client;
  275. let reply =
  276. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  277. rpc_client.request_vote(args.clone())
  278. })
  279. .await;
  280. if let Ok(reply) = reply {
  281. term_marker.mark(reply.term);
  282. return Some(reply.vote_granted && reply.term == term);
  283. }
  284. None
  285. }
  286. #[allow(clippy::too_many_arguments)]
  287. async fn count_vote_util_cancelled(
  288. me: Peer,
  289. term: Term,
  290. rf: Arc<Mutex<RaftState<Command>>>,
  291. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  292. cancel_token: futures_channel::oneshot::Receiver<()>,
  293. election: Arc<ElectionState>,
  294. new_log_entry: SharedSender<Option<Peer>>,
  295. verify_authority_daemon: VerifyAuthorityDaemon,
  296. persister: Arc<dyn Persister>,
  297. ) {
  298. let quorum = votes.len() >> 1;
  299. let mut vote_count = 0;
  300. let mut against_count = 0;
  301. let mut cancel_token = cancel_token;
  302. let mut futures_vec = votes;
  303. while vote_count < quorum
  304. && against_count <= quorum
  305. && !futures_vec.is_empty()
  306. {
  307. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  308. let selected = futures_util::future::select(
  309. cancel_token,
  310. futures_util::future::select_all(futures_vec),
  311. )
  312. .await;
  313. let ((one_vote, _, rest), new_token) = match selected {
  314. futures_util::future::Either::Left(_) => break,
  315. futures_util::future::Either::Right(tuple) => tuple,
  316. };
  317. futures_vec = rest;
  318. cancel_token = new_token;
  319. if let Ok(Some(vote)) = one_vote {
  320. if vote {
  321. vote_count += 1
  322. } else {
  323. against_count += 1
  324. }
  325. }
  326. }
  327. if vote_count < quorum {
  328. return;
  329. }
  330. let mut rf = rf.lock();
  331. if rf.current_term == term && rf.state == State::Candidate {
  332. // We are the leader now. The election timer can be stopped.
  333. election.stop_election_timer();
  334. rf.state = State::Leader;
  335. rf.leader_id = me;
  336. let log_len = rf.log.end();
  337. for item in rf.next_index.iter_mut() {
  338. *item = log_len;
  339. }
  340. for item in rf.match_index.iter_mut() {
  341. *item = 0;
  342. }
  343. for item in rf.current_step.iter_mut() {
  344. *item = 0;
  345. }
  346. // Reset the verify authority daemon before sending heartbeats to
  347. // followers. This is critical to the correctness of verifying
  348. // authority.
  349. // No verity authority request can go through before the reset is
  350. // done, since we are holding the raft lock.
  351. verify_authority_daemon.reset_state(term);
  352. if rf.commit_index != rf.log.last_index_term().index {
  353. rf.sentinel_commit_index =
  354. rf.log.add_entry(term, LogEntryEnum::TermChange);
  355. persister.save_state(rf.persisted_state().into());
  356. } else {
  357. rf.sentinel_commit_index = rf.commit_index;
  358. }
  359. // Sync all logs now.
  360. let _ = new_log_entry.send(None);
  361. }
  362. }
  363. }