lib.rs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. extern crate bincode;
  2. extern crate futures_channel;
  3. extern crate futures_util;
  4. extern crate labrpc;
  5. extern crate rand;
  6. #[macro_use]
  7. extern crate serde_derive;
  8. extern crate tokio;
  9. use std::convert::TryFrom;
  10. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  11. use std::sync::Arc;
  12. use std::time::{Duration, Instant};
  13. use crossbeam_utils::sync::WaitGroup;
  14. use parking_lot::{Condvar, Mutex};
  15. use rand::{thread_rng, Rng};
  16. use crate::install_snapshot::InstallSnapshotArgs;
  17. use crate::persister::PersistedRaftState;
  18. pub use crate::persister::Persister;
  19. pub(crate) use crate::raft_state::RaftState;
  20. pub(crate) use crate::raft_state::State;
  21. pub use crate::rpcs::RpcClient;
  22. pub use crate::snapshot::Snapshot;
  23. use crate::snapshot::SnapshotDaemon;
  24. use crate::utils::retry_rpc;
  25. mod index_term;
  26. mod install_snapshot;
  27. mod log_array;
  28. mod persister;
  29. mod raft_state;
  30. pub mod rpcs;
  31. mod snapshot;
  32. pub mod utils;
  33. #[derive(
  34. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  35. )]
  36. pub struct Term(pub usize);
  37. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  38. struct Peer(usize);
  39. pub type Index = usize;
  40. #[derive(Clone, Debug, Serialize, Deserialize)]
  41. struct LogEntry<Command> {
  42. term: Term,
  43. index: Index,
  44. command: Command,
  45. }
  46. struct ElectionState {
  47. // Timer will be removed upon shutdown or elected.
  48. timer: Mutex<(usize, Option<Instant>)>,
  49. // Wake up the timer thread when the timer is reset or cancelled.
  50. signal: Condvar,
  51. }
  52. #[derive(Clone)]
  53. pub struct Raft<Command> {
  54. inner_state: Arc<Mutex<RaftState<Command>>>,
  55. peers: Vec<Arc<RpcClient>>,
  56. me: Peer,
  57. persister: Arc<dyn Persister>,
  58. new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
  59. apply_command_signal: Arc<Condvar>,
  60. keep_running: Arc<AtomicBool>,
  61. election: Arc<ElectionState>,
  62. snapshot_daemon: SnapshotDaemon,
  63. thread_pool: Arc<tokio::runtime::Runtime>,
  64. stop_wait_group: WaitGroup,
  65. }
  66. #[derive(Clone, Debug, Serialize, Deserialize)]
  67. struct RequestVoteArgs {
  68. term: Term,
  69. candidate_id: Peer,
  70. last_log_index: Index,
  71. last_log_term: Term,
  72. }
  73. #[derive(Clone, Debug, Serialize, Deserialize)]
  74. struct RequestVoteReply {
  75. term: Term,
  76. vote_granted: bool,
  77. }
  78. #[derive(Clone, Debug, Serialize, Deserialize)]
  79. struct AppendEntriesArgs<Command> {
  80. term: Term,
  81. leader_id: Peer,
  82. prev_log_index: Index,
  83. prev_log_term: Term,
  84. entries: Vec<LogEntry<Command>>,
  85. leader_commit: Index,
  86. }
  87. #[derive(Clone, Debug, Serialize, Deserialize)]
  88. struct AppendEntriesReply {
  89. term: Term,
  90. success: bool,
  91. }
  92. #[repr(align(64))]
  93. struct Opening(Arc<AtomicUsize>);
  94. // Commands must be
  95. // 0. 'static: they have to live long enough for thread pools.
  96. // 1. clone: they are put in vectors and request messages.
  97. // 2. serializable: they are sent over RPCs and persisted.
  98. // 3. deserializable: they are restored from storage.
  99. // 4. send: they are referenced in futures.
  100. // 5. default, because we need an element for the first entry.
  101. impl<Command> Raft<Command>
  102. where
  103. Command: 'static
  104. + Clone
  105. + serde::Serialize
  106. + serde::de::DeserializeOwned
  107. + Send
  108. + Default,
  109. {
  110. /// Create a new raft instance.
  111. ///
  112. /// Each instance will create at least 3 + (number of peers) threads. The
  113. /// extensive usage of threads is to minimize latency.
  114. pub fn new<ApplyCommandFunc, RequestSnapshotFunc>(
  115. peers: Vec<RpcClient>,
  116. me: usize,
  117. persister: Arc<dyn Persister>,
  118. apply_command: ApplyCommandFunc,
  119. max_state_size_bytes: Option<usize>,
  120. request_snapshot: RequestSnapshotFunc,
  121. ) -> Self
  122. where
  123. ApplyCommandFunc: 'static + Send + FnMut(Index, Command),
  124. RequestSnapshotFunc: 'static + Send + FnMut(Index) -> Snapshot,
  125. {
  126. let peer_size = peers.len();
  127. let mut state = RaftState {
  128. current_term: Term(0),
  129. voted_for: None,
  130. log: log_array::LogArray::create(),
  131. commit_index: 0,
  132. last_applied: 0,
  133. next_index: vec![1; peer_size],
  134. match_index: vec![0; peer_size],
  135. current_step: vec![0; peer_size],
  136. state: State::Follower,
  137. leader_id: Peer(me),
  138. };
  139. if let Ok(persisted_state) =
  140. PersistedRaftState::try_from(persister.read_state())
  141. {
  142. state.current_term = persisted_state.current_term;
  143. state.voted_for = persisted_state.voted_for;
  144. state.log =
  145. log_array::LogArray::restore(persisted_state.log).unwrap();
  146. }
  147. let election = ElectionState {
  148. timer: Mutex::new((0, None)),
  149. signal: Condvar::new(),
  150. };
  151. election.reset_election_timer();
  152. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  153. .enable_time()
  154. .thread_name(format!("raft-instance-{}", me))
  155. .worker_threads(peer_size)
  156. .build()
  157. .expect("Creating thread pool should not fail");
  158. let peers = peers.into_iter().map(Arc::new).collect();
  159. let mut this = Raft {
  160. inner_state: Arc::new(Mutex::new(state)),
  161. peers,
  162. me: Peer(me),
  163. persister,
  164. new_log_entry: None,
  165. apply_command_signal: Arc::new(Default::default()),
  166. keep_running: Arc::new(Default::default()),
  167. election: Arc::new(election),
  168. snapshot_daemon: Default::default(),
  169. thread_pool: Arc::new(thread_pool),
  170. stop_wait_group: WaitGroup::new(),
  171. };
  172. this.keep_running.store(true, Ordering::SeqCst);
  173. // Running in a standalone thread.
  174. this.run_log_entry_daemon();
  175. // Running in a standalone thread.
  176. this.run_apply_command_daemon(apply_command);
  177. // One off function that schedules many little tasks, running on the
  178. // internal thread pool.
  179. this.schedule_heartbeats(Duration::from_millis(
  180. HEARTBEAT_INTERVAL_MILLIS,
  181. ));
  182. // The last step is to start running election timer.
  183. this.run_election_timer();
  184. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
  185. this
  186. }
  187. }
  188. // Command must be
  189. // 1. clone: they are copied to the persister.
  190. // 2. serialize: they are converted to bytes to persist.
  191. // 3. default: a default value is used as the first element of the log.
  192. impl<Command> Raft<Command>
  193. where
  194. Command: Clone + serde::Serialize + Default,
  195. {
  196. pub(crate) fn process_request_vote(
  197. &self,
  198. args: RequestVoteArgs,
  199. ) -> RequestVoteReply {
  200. let mut rf = self.inner_state.lock();
  201. let term = rf.current_term;
  202. #[allow(clippy::comparison_chain)]
  203. if args.term < term {
  204. return RequestVoteReply {
  205. term,
  206. vote_granted: false,
  207. };
  208. } else if args.term > term {
  209. rf.current_term = args.term;
  210. rf.voted_for = None;
  211. rf.state = State::Follower;
  212. self.election.reset_election_timer();
  213. self.persister.save_state(rf.persisted_state().into());
  214. }
  215. let voted_for = rf.voted_for;
  216. let last_log = rf.log.last_index_term();
  217. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  218. && (args.last_log_term > last_log.term
  219. || (args.last_log_term == last_log.term
  220. && args.last_log_index >= last_log.index))
  221. {
  222. rf.voted_for = Some(args.candidate_id);
  223. // It is possible that we have set a timer above when updating the
  224. // current term. It does not hurt to update the timer again.
  225. // We do need to persist, though.
  226. self.election.reset_election_timer();
  227. self.persister.save_state(rf.persisted_state().into());
  228. RequestVoteReply {
  229. term: args.term,
  230. vote_granted: true,
  231. }
  232. } else {
  233. RequestVoteReply {
  234. term: args.term,
  235. vote_granted: false,
  236. }
  237. }
  238. }
  239. pub(crate) fn process_append_entries(
  240. &self,
  241. args: AppendEntriesArgs<Command>,
  242. ) -> AppendEntriesReply {
  243. let mut rf = self.inner_state.lock();
  244. if rf.current_term > args.term {
  245. return AppendEntriesReply {
  246. term: rf.current_term,
  247. success: false,
  248. };
  249. }
  250. if rf.current_term < args.term {
  251. rf.current_term = args.term;
  252. rf.voted_for = None;
  253. self.persister.save_state(rf.persisted_state().into());
  254. }
  255. rf.state = State::Follower;
  256. rf.leader_id = args.leader_id;
  257. self.election.reset_election_timer();
  258. if rf.log.end() <= args.prev_log_index
  259. || rf.log[args.prev_log_index].term != args.prev_log_term
  260. {
  261. return AppendEntriesReply {
  262. term: args.term,
  263. success: false,
  264. };
  265. }
  266. for (i, entry) in args.entries.iter().enumerate() {
  267. let index = i + args.prev_log_index + 1;
  268. if rf.log.end() > index {
  269. if rf.log[index].term != entry.term {
  270. rf.log.truncate(index);
  271. rf.log.push(entry.clone());
  272. }
  273. } else {
  274. rf.log.push(entry.clone());
  275. }
  276. }
  277. self.persister.save_state(rf.persisted_state().into());
  278. if args.leader_commit > rf.commit_index {
  279. rf.commit_index = if args.leader_commit < rf.log.end() {
  280. args.leader_commit
  281. } else {
  282. rf.log.last_index_term().index
  283. };
  284. self.apply_command_signal.notify_one();
  285. }
  286. AppendEntriesReply {
  287. term: args.term,
  288. success: true,
  289. }
  290. }
  291. }
  292. enum SyncLogEntryOperation<Command> {
  293. AppendEntries(AppendEntriesArgs<Command>),
  294. InstallSnapshot(InstallSnapshotArgs),
  295. None,
  296. }
  297. // Command must be
  298. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  299. // 1. clone: they are copied to the persister.
  300. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  301. // 3. serialize: they are converted to bytes to persist.
  302. // 4. default: a default value is used as the first element of log.
  303. impl<Command> Raft<Command>
  304. where
  305. Command: 'static + Clone + Send + serde::Serialize + Default,
  306. {
  307. fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
  308. let this = self.clone();
  309. std::thread::spawn(move || {
  310. let election = this.election.clone();
  311. let mut should_run = None;
  312. while this.keep_running.load(Ordering::SeqCst) {
  313. let mut cancel_handle = should_run
  314. .map(|last_timer_count| this.run_election(last_timer_count))
  315. .flatten();
  316. let mut guard = election.timer.lock();
  317. let (timer_count, deadline) = *guard;
  318. if let Some(last_timer_count) = should_run {
  319. // If the timer was changed more than once, we know the
  320. // last scheduled election should have been cancelled.
  321. if timer_count > last_timer_count + 1 {
  322. cancel_handle.take().map(|c| c.send(()));
  323. }
  324. }
  325. // check the running signal before sleeping. We are holding the
  326. // timer lock, so no one can change it. The kill() method will
  327. // not be able to notify this thread before `wait` is called.
  328. if !this.keep_running.load(Ordering::SeqCst) {
  329. break;
  330. }
  331. should_run = match deadline {
  332. Some(timeout) => loop {
  333. let ret =
  334. election.signal.wait_until(&mut guard, timeout);
  335. let fired = ret.timed_out() && Instant::now() > timeout;
  336. // If the timer has been updated, do not schedule,
  337. // break so that we could cancel.
  338. if timer_count != guard.0 {
  339. // Timer has been updated, cancel current
  340. // election, and block on timeout again.
  341. break None;
  342. } else if fired {
  343. // Timer has fired, remove the timer and allow
  344. // running the next election at timer_count.
  345. // If the next election is cancelled before we
  346. // are back on wait, timer_count will be set to
  347. // a different value.
  348. guard.0 += 1;
  349. guard.1.take();
  350. break Some(guard.0);
  351. }
  352. },
  353. None => {
  354. election.signal.wait(&mut guard);
  355. // The timeout has changed, check again.
  356. None
  357. }
  358. };
  359. drop(guard);
  360. // Whenever woken up, cancel the current running election.
  361. // There are 3 cases we could reach here
  362. // 1. We received an AppendEntries, or decided to vote for
  363. // a peer, and thus turned into a follower. In this case we'll
  364. // be notified by the election signal.
  365. // 2. We are a follower but didn't receive a heartbeat on time,
  366. // or we are a candidate but didn't not collect enough vote on
  367. // time. In this case we'll have a timeout.
  368. // 3. When become a leader, or are shutdown. In this case we'll
  369. // be notified by the election signal.
  370. cancel_handle.map(|c| c.send(()));
  371. }
  372. let stop_wait_group = this.stop_wait_group.clone();
  373. // Making sure the rest of `this` is dropped before the wait group.
  374. drop(this);
  375. drop(stop_wait_group);
  376. })
  377. }
  378. fn run_election(
  379. &self,
  380. timer_count: usize,
  381. ) -> Option<futures_channel::oneshot::Sender<()>> {
  382. let me = self.me;
  383. let (term, args) = {
  384. let mut rf = self.inner_state.lock();
  385. // The previous election is faster and reached the critical section
  386. // before us. We should stop and not run this election.
  387. // Or someone else increased the term and the timer is reset.
  388. if !self.election.try_reset_election_timer(timer_count) {
  389. return None;
  390. }
  391. rf.current_term.0 += 1;
  392. rf.voted_for = Some(me);
  393. rf.state = State::Candidate;
  394. self.persister.save_state(rf.persisted_state().into());
  395. let term = rf.current_term;
  396. let (last_log_index, last_log_term) =
  397. rf.log.last_index_term().unpack();
  398. (
  399. term,
  400. RequestVoteArgs {
  401. term,
  402. candidate_id: me,
  403. last_log_index,
  404. last_log_term,
  405. },
  406. )
  407. };
  408. let mut votes = vec![];
  409. for (index, rpc_client) in self.peers.iter().enumerate() {
  410. if index != self.me.0 {
  411. // RpcClient must be cloned so that it lives long enough for
  412. // spawn(), which requires static life time.
  413. let rpc_client = rpc_client.clone();
  414. // RPCs are started right away.
  415. let one_vote = self
  416. .thread_pool
  417. .spawn(Self::request_vote(rpc_client, args.clone()));
  418. votes.push(one_vote);
  419. }
  420. }
  421. let (tx, rx) = futures_channel::oneshot::channel();
  422. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  423. me,
  424. term,
  425. self.inner_state.clone(),
  426. votes,
  427. rx,
  428. self.election.clone(),
  429. self.new_log_entry.clone().unwrap(),
  430. ));
  431. Some(tx)
  432. }
  433. const REQUEST_VOTE_RETRY: usize = 1;
  434. async fn request_vote(
  435. rpc_client: Arc<RpcClient>,
  436. args: RequestVoteArgs,
  437. ) -> Option<bool> {
  438. let term = args.term;
  439. // See the comment in send_heartbeat() for this override.
  440. let rpc_client = rpc_client.as_ref();
  441. let reply =
  442. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  443. rpc_client.call_request_vote(args.clone())
  444. })
  445. .await;
  446. if let Ok(reply) = reply {
  447. return Some(reply.vote_granted && reply.term == term);
  448. }
  449. None
  450. }
  451. async fn count_vote_util_cancelled(
  452. me: Peer,
  453. term: Term,
  454. rf: Arc<Mutex<RaftState<Command>>>,
  455. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  456. cancel_token: futures_channel::oneshot::Receiver<()>,
  457. election: Arc<ElectionState>,
  458. new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
  459. ) {
  460. let quorum = votes.len() >> 1;
  461. let mut vote_count = 0;
  462. let mut against_count = 0;
  463. let mut cancel_token = cancel_token;
  464. let mut futures_vec = votes;
  465. while vote_count < quorum
  466. && against_count <= quorum
  467. && !futures_vec.is_empty()
  468. {
  469. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  470. let selected = futures_util::future::select(
  471. cancel_token,
  472. futures_util::future::select_all(futures_vec),
  473. )
  474. .await;
  475. let ((one_vote, _, rest), new_token) = match selected {
  476. futures_util::future::Either::Left(_) => break,
  477. futures_util::future::Either::Right(tuple) => tuple,
  478. };
  479. futures_vec = rest;
  480. cancel_token = new_token;
  481. if let Ok(Some(vote)) = one_vote {
  482. if vote {
  483. vote_count += 1
  484. } else {
  485. against_count += 1
  486. }
  487. }
  488. }
  489. if vote_count < quorum {
  490. return;
  491. }
  492. let mut rf = rf.lock();
  493. if rf.current_term == term && rf.state == State::Candidate {
  494. // We are the leader now. The election timer can be stopped.
  495. election.stop_election_timer();
  496. rf.state = State::Leader;
  497. rf.leader_id = me;
  498. let log_len = rf.log.end();
  499. for item in rf.next_index.iter_mut() {
  500. *item = log_len;
  501. }
  502. for item in rf.match_index.iter_mut() {
  503. *item = 0;
  504. }
  505. for item in rf.current_step.iter_mut() {
  506. *item = 0;
  507. }
  508. // Sync all logs now.
  509. let _ = new_log_entry.send(None);
  510. }
  511. }
  512. fn schedule_heartbeats(&self, interval: Duration) {
  513. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  514. if peer_index != self.me.0 {
  515. // rf is now owned by the outer async function.
  516. let rf = self.inner_state.clone();
  517. // RPC client must be cloned into the outer async function.
  518. let rpc_client = rpc_client.clone();
  519. // Shutdown signal.
  520. let keep_running = self.keep_running.clone();
  521. self.thread_pool.spawn(async move {
  522. let mut interval = tokio::time::interval(interval);
  523. while keep_running.load(Ordering::SeqCst) {
  524. interval.tick().await;
  525. if let Some(args) = Self::build_heartbeat(&rf) {
  526. tokio::spawn(Self::send_heartbeat(
  527. rpc_client.clone(),
  528. args,
  529. ));
  530. }
  531. }
  532. });
  533. }
  534. }
  535. }
  536. fn build_heartbeat(
  537. rf: &Mutex<RaftState<Command>>,
  538. ) -> Option<AppendEntriesArgs<Command>> {
  539. let rf = rf.lock();
  540. if !rf.is_leader() {
  541. return None;
  542. }
  543. let last_log = rf.log.last_index_term();
  544. let args = AppendEntriesArgs {
  545. term: rf.current_term,
  546. leader_id: rf.leader_id,
  547. prev_log_index: last_log.index,
  548. prev_log_term: last_log.term,
  549. entries: vec![],
  550. leader_commit: rf.commit_index,
  551. };
  552. Some(args)
  553. }
  554. const HEARTBEAT_RETRY: usize = 1;
  555. async fn send_heartbeat(
  556. rpc_client: Arc<RpcClient>,
  557. args: AppendEntriesArgs<Command>,
  558. ) -> std::io::Result<()> {
  559. // Passing a reference that is moved to the following closure.
  560. //
  561. // It won't work if the rpc_client of type Arc is moved into the closure
  562. // directly. To clone the Arc, the function must own a mutable reference
  563. // to it. At the same time, rpc_client.call_append_entries() returns a
  564. // future that must own a reference, too. That caused a compiling error
  565. // of FnMut allowing "references to captured variables to escape".
  566. //
  567. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  568. // (no Mut), which can allow references to escape.
  569. //
  570. // Another option is to use non-move closures, in which case rpc_client
  571. // of type Arc can be passed-in directly. However that requires args to
  572. // be sync because they can be shared by more than one futures.
  573. let rpc_client = rpc_client.as_ref();
  574. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  575. rpc_client.call_append_entries(args.clone())
  576. })
  577. .await?;
  578. Ok(())
  579. }
  580. fn run_log_entry_daemon(&mut self) -> std::thread::JoinHandle<()> {
  581. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  582. self.new_log_entry.replace(tx);
  583. // Clone everything that the thread needs.
  584. let this = self.clone();
  585. std::thread::spawn(move || {
  586. let mut openings = vec![];
  587. openings.resize_with(this.peers.len(), || {
  588. Opening(Arc::new(AtomicUsize::new(0)))
  589. });
  590. let openings = openings; // Not mutable beyond this point.
  591. while let Ok(peer) = rx.recv() {
  592. if !this.keep_running.load(Ordering::SeqCst) {
  593. break;
  594. }
  595. if !this.inner_state.lock().is_leader() {
  596. continue;
  597. }
  598. for (i, rpc_client) in this.peers.iter().enumerate() {
  599. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  600. {
  601. // Only schedule a new task if the last task has cleared
  602. // the queue of RPC requests.
  603. if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
  604. this.thread_pool.spawn(Self::sync_log_entry(
  605. this.inner_state.clone(),
  606. rpc_client.clone(),
  607. i,
  608. this.new_log_entry.clone().unwrap(),
  609. openings[i].0.clone(),
  610. this.apply_command_signal.clone(),
  611. ));
  612. }
  613. }
  614. }
  615. }
  616. let stop_wait_group = this.stop_wait_group.clone();
  617. // Making sure the rest of `this` is dropped before the wait group.
  618. drop(this);
  619. drop(stop_wait_group);
  620. })
  621. }
  622. async fn sync_log_entry(
  623. rf: Arc<Mutex<RaftState<Command>>>,
  624. rpc_client: Arc<RpcClient>,
  625. peer_index: usize,
  626. rerun: std::sync::mpsc::Sender<Option<Peer>>,
  627. opening: Arc<AtomicUsize>,
  628. apply_command_signal: Arc<Condvar>,
  629. ) {
  630. if opening.swap(0, Ordering::SeqCst) == 0 {
  631. return;
  632. }
  633. let operation = Self::build_sync_log_entry(&rf, peer_index);
  634. let (term, match_index, succeeded) = match operation {
  635. SyncLogEntryOperation::AppendEntries(args) => {
  636. let term = args.term;
  637. let match_index = args.prev_log_index + args.entries.len();
  638. let succeeded = Self::append_entries(&rpc_client, args).await;
  639. (term, match_index, succeeded)
  640. }
  641. SyncLogEntryOperation::InstallSnapshot(args) => {
  642. let term = args.term;
  643. let match_index = args.last_included_index;
  644. let succeeded =
  645. Self::send_install_snapshot(&rpc_client, args).await;
  646. (term, match_index, succeeded)
  647. }
  648. SyncLogEntryOperation::None => return,
  649. };
  650. match succeeded {
  651. Ok(Some(true)) => {
  652. let mut rf = rf.lock();
  653. if rf.current_term != term {
  654. return;
  655. }
  656. rf.next_index[peer_index] = match_index + 1;
  657. rf.current_step[peer_index] = 0;
  658. if match_index > rf.match_index[peer_index] {
  659. rf.match_index[peer_index] = match_index;
  660. if rf.is_leader() && rf.current_term == term {
  661. let mut matched = rf.match_index.to_vec();
  662. let mid = matched.len() / 2 + 1;
  663. matched.sort_unstable();
  664. let new_commit_index = matched[mid];
  665. if new_commit_index > rf.commit_index
  666. && rf.log[new_commit_index].term == rf.current_term
  667. {
  668. rf.commit_index = new_commit_index;
  669. apply_command_signal.notify_one();
  670. }
  671. }
  672. }
  673. }
  674. Ok(Some(false)) => {
  675. let mut rf = rf.lock();
  676. let step = &mut rf.current_step[peer_index];
  677. if *step < 5 {
  678. *step += 1;
  679. }
  680. let diff = 4 << *step;
  681. let next_index = &mut rf.next_index[peer_index];
  682. if diff >= *next_index {
  683. *next_index = 1usize;
  684. } else {
  685. *next_index -= diff;
  686. }
  687. // Ignore the error. The log syncing thread must have died.
  688. let _ = rerun.send(Some(Peer(peer_index)));
  689. }
  690. // Do nothing, not our term anymore.
  691. Ok(None) => {}
  692. Err(_) => {
  693. tokio::time::sleep(Duration::from_millis(
  694. HEARTBEAT_INTERVAL_MILLIS,
  695. ))
  696. .await;
  697. // Ignore the error. The log syncing thread must have died.
  698. let _ = rerun.send(Some(Peer(peer_index)));
  699. }
  700. };
  701. }
  702. fn build_sync_log_entry(
  703. rf: &Mutex<RaftState<Command>>,
  704. peer_index: usize,
  705. ) -> SyncLogEntryOperation<Command> {
  706. let rf = rf.lock();
  707. if !rf.is_leader() {
  708. return SyncLogEntryOperation::None;
  709. }
  710. // To send AppendEntries request, next_index must be strictly larger
  711. // than start(). Otherwise we won't be able to know the log term of the
  712. // entry right before next_index.
  713. return if rf.next_index[peer_index] > rf.log.start() {
  714. SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
  715. &rf, peer_index,
  716. ))
  717. } else {
  718. SyncLogEntryOperation::InstallSnapshot(
  719. Self::build_install_snapshot(&rf),
  720. )
  721. };
  722. }
  723. fn build_append_entries(
  724. rf: &RaftState<Command>,
  725. peer_index: usize,
  726. ) -> AppendEntriesArgs<Command> {
  727. let prev_log_index = rf.next_index[peer_index] - 1;
  728. let prev_log_term = rf.log[prev_log_index].term;
  729. AppendEntriesArgs {
  730. term: rf.current_term,
  731. leader_id: rf.leader_id,
  732. prev_log_index,
  733. prev_log_term,
  734. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  735. leader_commit: rf.commit_index,
  736. }
  737. }
  738. const APPEND_ENTRIES_RETRY: usize = 1;
  739. async fn append_entries(
  740. rpc_client: &RpcClient,
  741. args: AppendEntriesArgs<Command>,
  742. ) -> std::io::Result<Option<bool>> {
  743. let term = args.term;
  744. let reply = retry_rpc(
  745. Self::APPEND_ENTRIES_RETRY,
  746. RPC_DEADLINE,
  747. move |_round| rpc_client.call_append_entries(args.clone()),
  748. )
  749. .await?;
  750. Ok(if reply.term == term {
  751. Some(reply.success)
  752. } else {
  753. None
  754. })
  755. }
  756. fn run_apply_command_daemon<Func>(
  757. &self,
  758. mut apply_command: Func,
  759. ) -> std::thread::JoinHandle<()>
  760. where
  761. Func: 'static + Send + FnMut(Index, Command),
  762. {
  763. let keep_running = self.keep_running.clone();
  764. let rf = self.inner_state.clone();
  765. let condvar = self.apply_command_signal.clone();
  766. let snapshot_daemon = self.snapshot_daemon.clone();
  767. let stop_wait_group = self.stop_wait_group.clone();
  768. std::thread::spawn(move || {
  769. while keep_running.load(Ordering::SeqCst) {
  770. let (mut index, commands) = {
  771. let mut rf = rf.lock();
  772. if rf.last_applied >= rf.commit_index {
  773. condvar.wait_for(
  774. &mut rf,
  775. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
  776. );
  777. }
  778. if rf.last_applied < rf.commit_index {
  779. let index = rf.last_applied + 1;
  780. let last_one = rf.commit_index + 1;
  781. let commands: Vec<Command> = rf
  782. .log
  783. .between(index, last_one)
  784. .iter()
  785. .map(|entry| entry.command.clone())
  786. .collect();
  787. rf.last_applied = rf.commit_index;
  788. (index, commands)
  789. } else {
  790. continue;
  791. }
  792. };
  793. // Release the lock while calling external functions.
  794. for command in commands {
  795. apply_command(index, command);
  796. snapshot_daemon.trigger();
  797. index += 1;
  798. }
  799. }
  800. drop(stop_wait_group);
  801. })
  802. }
  803. pub fn start(&self, command: Command) -> Option<(Term, Index)> {
  804. let mut rf = self.inner_state.lock();
  805. let term = rf.current_term;
  806. if !rf.is_leader() {
  807. return None;
  808. }
  809. let index = rf.log.add_command(term, command);
  810. self.persister.save_state(rf.persisted_state().into());
  811. let _ = self.new_log_entry.clone().unwrap().send(None);
  812. Some((term, index))
  813. }
  814. pub fn kill(mut self) {
  815. self.keep_running.store(false, Ordering::SeqCst);
  816. self.election.stop_election_timer();
  817. self.new_log_entry.take().map(|n| n.send(None));
  818. self.apply_command_signal.notify_all();
  819. self.snapshot_daemon.trigger();
  820. self.stop_wait_group.wait();
  821. std::sync::Arc::try_unwrap(self.thread_pool)
  822. .expect(
  823. "All references to the thread pool should have been dropped.",
  824. )
  825. .shutdown_timeout(Duration::from_millis(
  826. HEARTBEAT_INTERVAL_MILLIS * 2,
  827. ));
  828. }
  829. pub fn get_state(&self) -> (Term, bool) {
  830. let state = self.inner_state.lock();
  831. (state.current_term, state.is_leader())
  832. }
  833. }
  834. const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
  835. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
  836. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
  837. const RPC_DEADLINE: Duration = Duration::from_secs(2);
  838. impl ElectionState {
  839. fn reset_election_timer(&self) {
  840. let mut guard = self.timer.lock();
  841. guard.0 += 1;
  842. guard.1.replace(Self::election_timeout());
  843. self.signal.notify_one();
  844. }
  845. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  846. let mut guard = self.timer.lock();
  847. if guard.0 != timer_count {
  848. return false;
  849. }
  850. guard.0 += 1;
  851. guard.1.replace(Self::election_timeout());
  852. self.signal.notify_one();
  853. true
  854. }
  855. fn election_timeout() -> Instant {
  856. Instant::now()
  857. + Duration::from_millis(
  858. ELECTION_TIMEOUT_BASE_MILLIS
  859. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  860. )
  861. }
  862. fn stop_election_timer(&self) {
  863. let mut guard = self.timer.lock();
  864. guard.0 += 1;
  865. guard.1.take();
  866. self.signal.notify_one();
  867. }
  868. }