election.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use parking_lot::{Condvar, Mutex};
  5. use rand::{thread_rng, Rng};
  6. use crate::utils::{retry_rpc, RPC_DEADLINE};
  7. use crate::{Peer, Raft, RaftState, RequestVoteArgs, RpcClient, State, Term};
  8. #[derive(Default)]
  9. pub(crate) struct ElectionState {
  10. // Timer will be removed upon shutdown or elected.
  11. timer: Mutex<(usize, Option<Instant>)>,
  12. // Wake up the timer thread when the timer is reset or cancelled.
  13. signal: Condvar,
  14. }
  15. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
  16. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
  17. impl ElectionState {
  18. pub(crate) fn create() -> Self {
  19. Self {
  20. timer: Mutex::new((0, None)),
  21. signal: Condvar::new(),
  22. }
  23. }
  24. pub(crate) fn reset_election_timer(&self) {
  25. let mut guard = self.timer.lock();
  26. guard.0 += 1;
  27. guard.1.replace(Self::election_timeout());
  28. self.signal.notify_one();
  29. }
  30. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  31. let mut guard = self.timer.lock();
  32. if guard.0 != timer_count {
  33. return false;
  34. }
  35. guard.0 += 1;
  36. guard.1.replace(Self::election_timeout());
  37. self.signal.notify_one();
  38. true
  39. }
  40. fn election_timeout() -> Instant {
  41. Instant::now()
  42. + Duration::from_millis(
  43. ELECTION_TIMEOUT_BASE_MILLIS
  44. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  45. )
  46. }
  47. pub(crate) fn stop_election_timer(&self) {
  48. let mut guard = self.timer.lock();
  49. guard.0 += 1;
  50. guard.1.take();
  51. self.signal.notify_one();
  52. }
  53. }
  54. // Command must be
  55. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  56. // 1. clone: they are copied to the persister.
  57. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  58. // 3. serialize: they are converted to bytes to persist.
  59. impl<Command> Raft<Command>
  60. where
  61. Command: 'static + Clone + Send + serde::Serialize,
  62. {
  63. pub(crate) fn run_election_timer(&self) {
  64. let this = self.clone();
  65. let join_handle = std::thread::spawn(move || {
  66. // Note: do not change this to `let _ = ...`.
  67. let _guard = this.daemon_env.for_scope();
  68. let election = this.election.clone();
  69. let mut should_run = None;
  70. while this.keep_running.load(Ordering::SeqCst) {
  71. let mut cancel_handle = should_run
  72. .map(|last_timer_count| this.run_election(last_timer_count))
  73. .flatten();
  74. let mut guard = election.timer.lock();
  75. let (timer_count, deadline) = *guard;
  76. if let Some(last_timer_count) = should_run {
  77. // If the timer was changed more than once, we know the
  78. // last scheduled election should have been cancelled.
  79. if timer_count > last_timer_count + 1 {
  80. cancel_handle.take().map(|c| c.send(()));
  81. }
  82. }
  83. // check the running signal before sleeping. We are holding the
  84. // timer lock, so no one can change it. The kill() method will
  85. // not be able to notify this thread before `wait` is called.
  86. if !this.keep_running.load(Ordering::SeqCst) {
  87. break;
  88. }
  89. should_run = match deadline {
  90. Some(timeout) => loop {
  91. let ret =
  92. election.signal.wait_until(&mut guard, timeout);
  93. let fired = ret.timed_out() && Instant::now() > timeout;
  94. // If the timer has been updated, do not schedule,
  95. // break so that we could cancel.
  96. if timer_count != guard.0 {
  97. // Timer has been updated, cancel current
  98. // election, and block on timeout again.
  99. break None;
  100. } else if fired {
  101. // Timer has fired, remove the timer and allow
  102. // running the next election at timer_count.
  103. // If the next election is cancelled before we
  104. // are back on wait, timer_count will be set to
  105. // a different value.
  106. guard.0 += 1;
  107. guard.1.take();
  108. break Some(guard.0);
  109. }
  110. },
  111. None => {
  112. election.signal.wait(&mut guard);
  113. // The timeout has changed, check again.
  114. None
  115. }
  116. };
  117. drop(guard);
  118. // Whenever woken up, cancel the current running election.
  119. // There are 3 cases we could reach here
  120. // 1. We received an AppendEntries, or decided to vote for
  121. // a peer, and thus turned into a follower. In this case we'll
  122. // be notified by the election signal.
  123. // 2. We are a follower but didn't receive a heartbeat on time,
  124. // or we are a candidate but didn't not collect enough vote on
  125. // time. In this case we'll have a timeout.
  126. // 3. When become a leader, or are shutdown. In this case we'll
  127. // be notified by the election signal.
  128. cancel_handle.map(|c| c.send(()));
  129. }
  130. let stop_wait_group = this.stop_wait_group.clone();
  131. // Making sure the rest of `this` is dropped before the wait group.
  132. drop(this);
  133. drop(stop_wait_group);
  134. });
  135. self.daemon_env.watch_daemon(join_handle);
  136. }
  137. fn run_election(
  138. &self,
  139. timer_count: usize,
  140. ) -> Option<futures_channel::oneshot::Sender<()>> {
  141. let me = self.me;
  142. let (term, args) = {
  143. let mut rf = self.inner_state.lock();
  144. // The previous election is faster and reached the critical section
  145. // before us. We should stop and not run this election.
  146. // Or someone else increased the term and the timer is reset.
  147. if !self.election.try_reset_election_timer(timer_count) {
  148. return None;
  149. }
  150. rf.current_term.0 += 1;
  151. rf.voted_for = Some(me);
  152. rf.state = State::Candidate;
  153. self.persister.save_state(rf.persisted_state().into());
  154. let term = rf.current_term;
  155. let (last_log_index, last_log_term) =
  156. rf.log.last_index_term().unpack();
  157. (
  158. term,
  159. RequestVoteArgs {
  160. term,
  161. candidate_id: me,
  162. last_log_index,
  163. last_log_term,
  164. },
  165. )
  166. };
  167. let mut votes = vec![];
  168. for (index, rpc_client) in self.peers.iter().enumerate() {
  169. if index != self.me.0 {
  170. // RpcClient must be cloned so that it lives long enough for
  171. // spawn(), which requires static life time.
  172. let rpc_client = rpc_client.clone();
  173. // RPCs are started right away.
  174. let one_vote = self
  175. .thread_pool
  176. .spawn(Self::request_vote(rpc_client, args.clone()));
  177. votes.push(one_vote);
  178. }
  179. }
  180. let (tx, rx) = futures_channel::oneshot::channel();
  181. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  182. me,
  183. term,
  184. self.inner_state.clone(),
  185. votes,
  186. rx,
  187. self.election.clone(),
  188. self.new_log_entry.clone().unwrap(),
  189. ));
  190. Some(tx)
  191. }
  192. const REQUEST_VOTE_RETRY: usize = 1;
  193. async fn request_vote(
  194. rpc_client: Arc<RpcClient>,
  195. args: RequestVoteArgs,
  196. ) -> Option<bool> {
  197. let term = args.term;
  198. // See the comment in send_heartbeat() for this override.
  199. let rpc_client = rpc_client.as_ref();
  200. let reply =
  201. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  202. rpc_client.call_request_vote(args.clone())
  203. })
  204. .await;
  205. if let Ok(reply) = reply {
  206. return Some(reply.vote_granted && reply.term == term);
  207. }
  208. None
  209. }
  210. async fn count_vote_util_cancelled(
  211. me: Peer,
  212. term: Term,
  213. rf: Arc<Mutex<RaftState<Command>>>,
  214. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  215. cancel_token: futures_channel::oneshot::Receiver<()>,
  216. election: Arc<ElectionState>,
  217. new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
  218. ) {
  219. let quorum = votes.len() >> 1;
  220. let mut vote_count = 0;
  221. let mut against_count = 0;
  222. let mut cancel_token = cancel_token;
  223. let mut futures_vec = votes;
  224. while vote_count < quorum
  225. && against_count <= quorum
  226. && !futures_vec.is_empty()
  227. {
  228. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  229. let selected = futures_util::future::select(
  230. cancel_token,
  231. futures_util::future::select_all(futures_vec),
  232. )
  233. .await;
  234. let ((one_vote, _, rest), new_token) = match selected {
  235. futures_util::future::Either::Left(_) => break,
  236. futures_util::future::Either::Right(tuple) => tuple,
  237. };
  238. futures_vec = rest;
  239. cancel_token = new_token;
  240. if let Ok(Some(vote)) = one_vote {
  241. if vote {
  242. vote_count += 1
  243. } else {
  244. against_count += 1
  245. }
  246. }
  247. }
  248. if vote_count < quorum {
  249. return;
  250. }
  251. let mut rf = rf.lock();
  252. if rf.current_term == term && rf.state == State::Candidate {
  253. // We are the leader now. The election timer can be stopped.
  254. election.stop_election_timer();
  255. rf.state = State::Leader;
  256. rf.leader_id = me;
  257. let log_len = rf.log.end();
  258. for item in rf.next_index.iter_mut() {
  259. *item = log_len;
  260. }
  261. for item in rf.match_index.iter_mut() {
  262. *item = 0;
  263. }
  264. for item in rf.current_step.iter_mut() {
  265. *item = 0;
  266. }
  267. // Sync all logs now.
  268. let _ = new_log_entry.send(None);
  269. }
  270. }
  271. }