lib.rs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. extern crate bincode;
  2. extern crate futures_channel;
  3. extern crate futures_util;
  4. extern crate labrpc;
  5. extern crate rand;
  6. #[macro_use]
  7. extern crate serde_derive;
  8. extern crate tokio;
  9. use std::convert::TryFrom;
  10. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  11. use std::sync::Arc;
  12. use std::time::{Duration, Instant};
  13. use crossbeam_utils::sync::WaitGroup;
  14. use parking_lot::{Condvar, Mutex};
  15. use rand::{thread_rng, Rng};
  16. use crate::apply_command::ApplyCommandFnMut;
  17. pub use crate::apply_command::ApplyCommandMessage;
  18. use crate::index_term::IndexTerm;
  19. use crate::install_snapshot::InstallSnapshotArgs;
  20. use crate::persister::PersistedRaftState;
  21. pub use crate::persister::Persister;
  22. pub(crate) use crate::raft_state::RaftState;
  23. pub(crate) use crate::raft_state::State;
  24. pub use crate::rpcs::RpcClient;
  25. pub use crate::snapshot::Snapshot;
  26. use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
  27. use crate::utils::retry_rpc;
  28. mod apply_command;
  29. mod index_term;
  30. mod install_snapshot;
  31. mod log_array;
  32. mod persister;
  33. mod raft_state;
  34. pub mod rpcs;
  35. mod snapshot;
  36. pub mod utils;
  37. #[derive(
  38. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  39. )]
  40. pub struct Term(pub usize);
  41. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  42. struct Peer(usize);
  43. pub type Index = usize;
  44. #[derive(Clone, Debug, Serialize, Deserialize)]
  45. struct LogEntry<Command> {
  46. index: Index,
  47. term: Term,
  48. command: Command,
  49. }
  50. struct ElectionState {
  51. // Timer will be removed upon shutdown or elected.
  52. timer: Mutex<(usize, Option<Instant>)>,
  53. // Wake up the timer thread when the timer is reset or cancelled.
  54. signal: Condvar,
  55. }
  56. #[derive(Clone)]
  57. pub struct Raft<Command> {
  58. inner_state: Arc<Mutex<RaftState<Command>>>,
  59. peers: Vec<Arc<RpcClient>>,
  60. me: Peer,
  61. persister: Arc<dyn Persister>,
  62. new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
  63. apply_command_signal: Arc<Condvar>,
  64. keep_running: Arc<AtomicBool>,
  65. election: Arc<ElectionState>,
  66. snapshot_daemon: SnapshotDaemon,
  67. thread_pool: Arc<tokio::runtime::Runtime>,
  68. stop_wait_group: WaitGroup,
  69. }
  70. #[derive(Clone, Debug, Serialize, Deserialize)]
  71. struct RequestVoteArgs {
  72. term: Term,
  73. candidate_id: Peer,
  74. last_log_index: Index,
  75. last_log_term: Term,
  76. }
  77. #[derive(Clone, Debug, Serialize, Deserialize)]
  78. struct RequestVoteReply {
  79. term: Term,
  80. vote_granted: bool,
  81. }
  82. #[derive(Clone, Debug, Serialize, Deserialize)]
  83. struct AppendEntriesArgs<Command> {
  84. term: Term,
  85. leader_id: Peer,
  86. prev_log_index: Index,
  87. prev_log_term: Term,
  88. entries: Vec<LogEntry<Command>>,
  89. leader_commit: Index,
  90. }
  91. #[derive(Clone, Debug, Serialize, Deserialize)]
  92. struct AppendEntriesReply {
  93. term: Term,
  94. success: bool,
  95. committed: Option<IndexTerm>,
  96. }
  97. #[repr(align(64))]
  98. struct Opening(Arc<AtomicUsize>);
  99. // Commands must be
  100. // 0. 'static: they have to live long enough for thread pools.
  101. // 1. clone: they are put in vectors and request messages.
  102. // 2. serializable: they are sent over RPCs and persisted.
  103. // 3. deserializable: they are restored from storage.
  104. // 4. send: they are referenced in futures.
  105. // 5. default, because we need an element for the first entry.
  106. impl<Command> Raft<Command>
  107. where
  108. Command: 'static
  109. + Clone
  110. + serde::Serialize
  111. + serde::de::DeserializeOwned
  112. + Send
  113. + Default,
  114. {
  115. /// Create a new raft instance.
  116. ///
  117. /// Each instance will create at least 3 + (number of peers) threads. The
  118. /// extensive usage of threads is to minimize latency.
  119. pub fn new(
  120. peers: Vec<RpcClient>,
  121. me: usize,
  122. persister: Arc<dyn Persister>,
  123. apply_command: impl ApplyCommandFnMut<Command>,
  124. max_state_size_bytes: Option<usize>,
  125. request_snapshot: impl RequestSnapshotFnMut,
  126. ) -> Self {
  127. let peer_size = peers.len();
  128. let mut state = RaftState {
  129. current_term: Term(0),
  130. voted_for: None,
  131. log: log_array::LogArray::create(),
  132. commit_index: 0,
  133. last_applied: 0,
  134. next_index: vec![1; peer_size],
  135. match_index: vec![0; peer_size],
  136. current_step: vec![0; peer_size],
  137. state: State::Follower,
  138. leader_id: Peer(me),
  139. };
  140. if let Ok(persisted_state) =
  141. PersistedRaftState::try_from(persister.read_state())
  142. {
  143. state.current_term = persisted_state.current_term;
  144. state.voted_for = persisted_state.voted_for;
  145. state.log = persisted_state.log;
  146. state.commit_index = state.log.start();
  147. }
  148. let election = ElectionState {
  149. timer: Mutex::new((0, None)),
  150. signal: Condvar::new(),
  151. };
  152. election.reset_election_timer();
  153. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  154. .enable_time()
  155. .thread_name(format!("raft-instance-{}", me))
  156. .worker_threads(peer_size)
  157. .build()
  158. .expect("Creating thread pool should not fail");
  159. let peers = peers.into_iter().map(Arc::new).collect();
  160. let mut this = Raft {
  161. inner_state: Arc::new(Mutex::new(state)),
  162. peers,
  163. me: Peer(me),
  164. persister,
  165. new_log_entry: None,
  166. apply_command_signal: Arc::new(Default::default()),
  167. keep_running: Arc::new(Default::default()),
  168. election: Arc::new(election),
  169. snapshot_daemon: Default::default(),
  170. thread_pool: Arc::new(thread_pool),
  171. stop_wait_group: WaitGroup::new(),
  172. };
  173. this.keep_running.store(true, Ordering::SeqCst);
  174. // Running in a standalone thread.
  175. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
  176. // Running in a standalone thread.
  177. this.run_log_entry_daemon();
  178. // Running in a standalone thread.
  179. this.run_apply_command_daemon(apply_command);
  180. // One off function that schedules many little tasks, running on the
  181. // internal thread pool.
  182. this.schedule_heartbeats(Duration::from_millis(
  183. HEARTBEAT_INTERVAL_MILLIS,
  184. ));
  185. // The last step is to start running election timer.
  186. this.run_election_timer();
  187. this
  188. }
  189. }
  190. // Command must be
  191. // 1. clone: they are copied to the persister.
  192. // 2. serialize: they are converted to bytes to persist.
  193. // 3. default: a default value is used as the first element of the log.
  194. impl<Command> Raft<Command>
  195. where
  196. Command: Clone + serde::Serialize + Default,
  197. {
  198. pub(crate) fn process_request_vote(
  199. &self,
  200. args: RequestVoteArgs,
  201. ) -> RequestVoteReply {
  202. let mut rf = self.inner_state.lock();
  203. let term = rf.current_term;
  204. #[allow(clippy::comparison_chain)]
  205. if args.term < term {
  206. return RequestVoteReply {
  207. term,
  208. vote_granted: false,
  209. };
  210. } else if args.term > term {
  211. rf.current_term = args.term;
  212. rf.voted_for = None;
  213. rf.state = State::Follower;
  214. self.election.reset_election_timer();
  215. self.persister.save_state(rf.persisted_state().into());
  216. }
  217. let voted_for = rf.voted_for;
  218. let last_log = rf.log.last_index_term();
  219. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  220. && (args.last_log_term > last_log.term
  221. || (args.last_log_term == last_log.term
  222. && args.last_log_index >= last_log.index))
  223. {
  224. rf.voted_for = Some(args.candidate_id);
  225. // It is possible that we have set a timer above when updating the
  226. // current term. It does not hurt to update the timer again.
  227. // We do need to persist, though.
  228. self.election.reset_election_timer();
  229. self.persister.save_state(rf.persisted_state().into());
  230. RequestVoteReply {
  231. term: args.term,
  232. vote_granted: true,
  233. }
  234. } else {
  235. RequestVoteReply {
  236. term: args.term,
  237. vote_granted: false,
  238. }
  239. }
  240. }
  241. pub(crate) fn process_append_entries(
  242. &self,
  243. args: AppendEntriesArgs<Command>,
  244. ) -> AppendEntriesReply {
  245. let mut rf = self.inner_state.lock();
  246. if rf.current_term > args.term {
  247. return AppendEntriesReply {
  248. term: rf.current_term,
  249. success: false,
  250. committed: Some(rf.log.first_after(rf.commit_index).into()),
  251. };
  252. }
  253. if rf.current_term < args.term {
  254. rf.current_term = args.term;
  255. rf.voted_for = None;
  256. self.persister.save_state(rf.persisted_state().into());
  257. }
  258. rf.state = State::Follower;
  259. rf.leader_id = args.leader_id;
  260. self.election.reset_election_timer();
  261. if rf.log.start() > args.prev_log_index
  262. || rf.log.end() <= args.prev_log_index
  263. || rf.log[args.prev_log_index].term != args.prev_log_term
  264. {
  265. return AppendEntriesReply {
  266. term: args.term,
  267. success: args.prev_log_index < rf.log.start(),
  268. committed: Some(rf.log.first_after(rf.commit_index).into()),
  269. };
  270. }
  271. for (i, entry) in args.entries.iter().enumerate() {
  272. let index = i + args.prev_log_index + 1;
  273. if rf.log.end() > index {
  274. if rf.log[index].term != entry.term {
  275. assert!(
  276. index > rf.commit_index,
  277. "Entries before commit index should never be rolled back"
  278. );
  279. rf.log.truncate(index);
  280. rf.log.push(entry.clone());
  281. }
  282. } else {
  283. rf.log.push(entry.clone());
  284. }
  285. }
  286. self.persister.save_state(rf.persisted_state().into());
  287. if args.leader_commit > rf.commit_index {
  288. rf.commit_index = if args.leader_commit < rf.log.end() {
  289. args.leader_commit
  290. } else {
  291. rf.log.last_index_term().index
  292. };
  293. self.apply_command_signal.notify_one();
  294. }
  295. AppendEntriesReply {
  296. term: args.term,
  297. success: true,
  298. committed: None,
  299. }
  300. }
  301. }
  302. enum SyncLogEntryOperation<Command> {
  303. AppendEntries(AppendEntriesArgs<Command>),
  304. InstallSnapshot(InstallSnapshotArgs),
  305. None,
  306. }
  307. enum SyncLogEntryResult {
  308. TermElapsed(Term),
  309. Archived(IndexTerm),
  310. Diverged(IndexTerm),
  311. Success,
  312. }
  313. // Command must be
  314. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  315. // 1. clone: they are copied to the persister.
  316. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  317. // 3. serialize: they are converted to bytes to persist.
  318. // 4. default: a default value is used as the first element of log.
  319. impl<Command> Raft<Command>
  320. where
  321. Command: 'static + Clone + Send + serde::Serialize + Default,
  322. {
  323. fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
  324. let this = self.clone();
  325. std::thread::spawn(move || {
  326. let election = this.election.clone();
  327. let mut should_run = None;
  328. while this.keep_running.load(Ordering::SeqCst) {
  329. let mut cancel_handle = should_run
  330. .map(|last_timer_count| this.run_election(last_timer_count))
  331. .flatten();
  332. let mut guard = election.timer.lock();
  333. let (timer_count, deadline) = *guard;
  334. if let Some(last_timer_count) = should_run {
  335. // If the timer was changed more than once, we know the
  336. // last scheduled election should have been cancelled.
  337. if timer_count > last_timer_count + 1 {
  338. cancel_handle.take().map(|c| c.send(()));
  339. }
  340. }
  341. // check the running signal before sleeping. We are holding the
  342. // timer lock, so no one can change it. The kill() method will
  343. // not be able to notify this thread before `wait` is called.
  344. if !this.keep_running.load(Ordering::SeqCst) {
  345. break;
  346. }
  347. should_run = match deadline {
  348. Some(timeout) => loop {
  349. let ret =
  350. election.signal.wait_until(&mut guard, timeout);
  351. let fired = ret.timed_out() && Instant::now() > timeout;
  352. // If the timer has been updated, do not schedule,
  353. // break so that we could cancel.
  354. if timer_count != guard.0 {
  355. // Timer has been updated, cancel current
  356. // election, and block on timeout again.
  357. break None;
  358. } else if fired {
  359. // Timer has fired, remove the timer and allow
  360. // running the next election at timer_count.
  361. // If the next election is cancelled before we
  362. // are back on wait, timer_count will be set to
  363. // a different value.
  364. guard.0 += 1;
  365. guard.1.take();
  366. break Some(guard.0);
  367. }
  368. },
  369. None => {
  370. election.signal.wait(&mut guard);
  371. // The timeout has changed, check again.
  372. None
  373. }
  374. };
  375. drop(guard);
  376. // Whenever woken up, cancel the current running election.
  377. // There are 3 cases we could reach here
  378. // 1. We received an AppendEntries, or decided to vote for
  379. // a peer, and thus turned into a follower. In this case we'll
  380. // be notified by the election signal.
  381. // 2. We are a follower but didn't receive a heartbeat on time,
  382. // or we are a candidate but didn't not collect enough vote on
  383. // time. In this case we'll have a timeout.
  384. // 3. When become a leader, or are shutdown. In this case we'll
  385. // be notified by the election signal.
  386. cancel_handle.map(|c| c.send(()));
  387. }
  388. let stop_wait_group = this.stop_wait_group.clone();
  389. // Making sure the rest of `this` is dropped before the wait group.
  390. drop(this);
  391. drop(stop_wait_group);
  392. })
  393. }
  394. fn run_election(
  395. &self,
  396. timer_count: usize,
  397. ) -> Option<futures_channel::oneshot::Sender<()>> {
  398. let me = self.me;
  399. let (term, args) = {
  400. let mut rf = self.inner_state.lock();
  401. // The previous election is faster and reached the critical section
  402. // before us. We should stop and not run this election.
  403. // Or someone else increased the term and the timer is reset.
  404. if !self.election.try_reset_election_timer(timer_count) {
  405. return None;
  406. }
  407. rf.current_term.0 += 1;
  408. rf.voted_for = Some(me);
  409. rf.state = State::Candidate;
  410. self.persister.save_state(rf.persisted_state().into());
  411. let term = rf.current_term;
  412. let (last_log_index, last_log_term) =
  413. rf.log.last_index_term().unpack();
  414. (
  415. term,
  416. RequestVoteArgs {
  417. term,
  418. candidate_id: me,
  419. last_log_index,
  420. last_log_term,
  421. },
  422. )
  423. };
  424. let mut votes = vec![];
  425. for (index, rpc_client) in self.peers.iter().enumerate() {
  426. if index != self.me.0 {
  427. // RpcClient must be cloned so that it lives long enough for
  428. // spawn(), which requires static life time.
  429. let rpc_client = rpc_client.clone();
  430. // RPCs are started right away.
  431. let one_vote = self
  432. .thread_pool
  433. .spawn(Self::request_vote(rpc_client, args.clone()));
  434. votes.push(one_vote);
  435. }
  436. }
  437. let (tx, rx) = futures_channel::oneshot::channel();
  438. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  439. me,
  440. term,
  441. self.inner_state.clone(),
  442. votes,
  443. rx,
  444. self.election.clone(),
  445. self.new_log_entry.clone().unwrap(),
  446. ));
  447. Some(tx)
  448. }
  449. const REQUEST_VOTE_RETRY: usize = 1;
  450. async fn request_vote(
  451. rpc_client: Arc<RpcClient>,
  452. args: RequestVoteArgs,
  453. ) -> Option<bool> {
  454. let term = args.term;
  455. // See the comment in send_heartbeat() for this override.
  456. let rpc_client = rpc_client.as_ref();
  457. let reply =
  458. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  459. rpc_client.call_request_vote(args.clone())
  460. })
  461. .await;
  462. if let Ok(reply) = reply {
  463. return Some(reply.vote_granted && reply.term == term);
  464. }
  465. None
  466. }
  467. async fn count_vote_util_cancelled(
  468. me: Peer,
  469. term: Term,
  470. rf: Arc<Mutex<RaftState<Command>>>,
  471. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  472. cancel_token: futures_channel::oneshot::Receiver<()>,
  473. election: Arc<ElectionState>,
  474. new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
  475. ) {
  476. let quorum = votes.len() >> 1;
  477. let mut vote_count = 0;
  478. let mut against_count = 0;
  479. let mut cancel_token = cancel_token;
  480. let mut futures_vec = votes;
  481. while vote_count < quorum
  482. && against_count <= quorum
  483. && !futures_vec.is_empty()
  484. {
  485. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  486. let selected = futures_util::future::select(
  487. cancel_token,
  488. futures_util::future::select_all(futures_vec),
  489. )
  490. .await;
  491. let ((one_vote, _, rest), new_token) = match selected {
  492. futures_util::future::Either::Left(_) => break,
  493. futures_util::future::Either::Right(tuple) => tuple,
  494. };
  495. futures_vec = rest;
  496. cancel_token = new_token;
  497. if let Ok(Some(vote)) = one_vote {
  498. if vote {
  499. vote_count += 1
  500. } else {
  501. against_count += 1
  502. }
  503. }
  504. }
  505. if vote_count < quorum {
  506. return;
  507. }
  508. let mut rf = rf.lock();
  509. if rf.current_term == term && rf.state == State::Candidate {
  510. // We are the leader now. The election timer can be stopped.
  511. election.stop_election_timer();
  512. rf.state = State::Leader;
  513. rf.leader_id = me;
  514. let log_len = rf.log.end();
  515. for item in rf.next_index.iter_mut() {
  516. *item = log_len;
  517. }
  518. for item in rf.match_index.iter_mut() {
  519. *item = 0;
  520. }
  521. for item in rf.current_step.iter_mut() {
  522. *item = 0;
  523. }
  524. // Sync all logs now.
  525. let _ = new_log_entry.send(None);
  526. }
  527. }
  528. fn schedule_heartbeats(&self, interval: Duration) {
  529. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  530. if peer_index != self.me.0 {
  531. // rf is now owned by the outer async function.
  532. let rf = self.inner_state.clone();
  533. // RPC client must be cloned into the outer async function.
  534. let rpc_client = rpc_client.clone();
  535. // Shutdown signal.
  536. let keep_running = self.keep_running.clone();
  537. self.thread_pool.spawn(async move {
  538. let mut interval = tokio::time::interval(interval);
  539. while keep_running.load(Ordering::SeqCst) {
  540. interval.tick().await;
  541. if let Some(args) = Self::build_heartbeat(&rf) {
  542. tokio::spawn(Self::send_heartbeat(
  543. rpc_client.clone(),
  544. args,
  545. ));
  546. }
  547. }
  548. });
  549. }
  550. }
  551. }
  552. fn build_heartbeat(
  553. rf: &Mutex<RaftState<Command>>,
  554. ) -> Option<AppendEntriesArgs<Command>> {
  555. let rf = rf.lock();
  556. if !rf.is_leader() {
  557. return None;
  558. }
  559. let last_log = rf.log.last_index_term();
  560. let args = AppendEntriesArgs {
  561. term: rf.current_term,
  562. leader_id: rf.leader_id,
  563. prev_log_index: last_log.index,
  564. prev_log_term: last_log.term,
  565. entries: vec![],
  566. leader_commit: rf.commit_index,
  567. };
  568. Some(args)
  569. }
  570. const HEARTBEAT_RETRY: usize = 1;
  571. async fn send_heartbeat(
  572. rpc_client: Arc<RpcClient>,
  573. args: AppendEntriesArgs<Command>,
  574. ) -> std::io::Result<()> {
  575. // Passing a reference that is moved to the following closure.
  576. //
  577. // It won't work if the rpc_client of type Arc is moved into the closure
  578. // directly. To clone the Arc, the function must own a mutable reference
  579. // to it. At the same time, rpc_client.call_append_entries() returns a
  580. // future that must own a reference, too. That caused a compiling error
  581. // of FnMut allowing "references to captured variables to escape".
  582. //
  583. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  584. // (no Mut), which can allow references to escape.
  585. //
  586. // Another option is to use non-move closures, in which case rpc_client
  587. // of type Arc can be passed-in directly. However that requires args to
  588. // be sync because they can be shared by more than one futures.
  589. let rpc_client = rpc_client.as_ref();
  590. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  591. rpc_client.call_append_entries(args.clone())
  592. })
  593. .await?;
  594. Ok(())
  595. }
  596. fn run_log_entry_daemon(&mut self) -> std::thread::JoinHandle<()> {
  597. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  598. self.new_log_entry.replace(tx);
  599. // Clone everything that the thread needs.
  600. let this = self.clone();
  601. std::thread::spawn(move || {
  602. let mut openings = vec![];
  603. openings.resize_with(this.peers.len(), || {
  604. Opening(Arc::new(AtomicUsize::new(0)))
  605. });
  606. let openings = openings; // Not mutable beyond this point.
  607. while let Ok(peer) = rx.recv() {
  608. if !this.keep_running.load(Ordering::SeqCst) {
  609. break;
  610. }
  611. if !this.inner_state.lock().is_leader() {
  612. continue;
  613. }
  614. for (i, rpc_client) in this.peers.iter().enumerate() {
  615. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  616. {
  617. // Only schedule a new task if the last task has cleared
  618. // the queue of RPC requests.
  619. if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
  620. this.thread_pool.spawn(Self::sync_log_entry(
  621. this.inner_state.clone(),
  622. rpc_client.clone(),
  623. i,
  624. this.new_log_entry.clone().unwrap(),
  625. openings[i].0.clone(),
  626. this.apply_command_signal.clone(),
  627. ));
  628. }
  629. }
  630. }
  631. }
  632. let stop_wait_group = this.stop_wait_group.clone();
  633. // Making sure the rest of `this` is dropped before the wait group.
  634. drop(this);
  635. drop(stop_wait_group);
  636. })
  637. }
  638. async fn sync_log_entry(
  639. rf: Arc<Mutex<RaftState<Command>>>,
  640. rpc_client: Arc<RpcClient>,
  641. peer_index: usize,
  642. rerun: std::sync::mpsc::Sender<Option<Peer>>,
  643. opening: Arc<AtomicUsize>,
  644. apply_command_signal: Arc<Condvar>,
  645. ) {
  646. if opening.swap(0, Ordering::SeqCst) == 0 {
  647. return;
  648. }
  649. let operation = Self::build_sync_log_entry(&rf, peer_index);
  650. let (term, prev_log_index, match_index, succeeded) = match operation {
  651. SyncLogEntryOperation::AppendEntries(args) => {
  652. let term = args.term;
  653. let prev_log_index = args.prev_log_index;
  654. let match_index = args.prev_log_index + args.entries.len();
  655. let succeeded = Self::append_entries(&rpc_client, args).await;
  656. (term, prev_log_index, match_index, succeeded)
  657. }
  658. SyncLogEntryOperation::InstallSnapshot(args) => {
  659. let term = args.term;
  660. let prev_log_index = args.last_included_index;
  661. let match_index = args.last_included_index;
  662. let succeeded =
  663. Self::send_install_snapshot(&rpc_client, args).await;
  664. (term, prev_log_index, match_index, succeeded)
  665. }
  666. SyncLogEntryOperation::None => return,
  667. };
  668. let peer = Peer(peer_index);
  669. match succeeded {
  670. Ok(SyncLogEntryResult::Success) => {
  671. let mut rf = rf.lock();
  672. if rf.current_term != term {
  673. return;
  674. }
  675. rf.next_index[peer_index] = match_index + 1;
  676. rf.current_step[peer_index] = 0;
  677. if match_index > rf.match_index[peer_index] {
  678. rf.match_index[peer_index] = match_index;
  679. if rf.is_leader() && rf.current_term == term {
  680. let mut matched = rf.match_index.to_vec();
  681. let mid = matched.len() / 2 + 1;
  682. matched.sort_unstable();
  683. let new_commit_index = matched[mid];
  684. if new_commit_index > rf.commit_index
  685. && rf.log[new_commit_index].term == rf.current_term
  686. {
  687. rf.commit_index = new_commit_index;
  688. apply_command_signal.notify_one();
  689. }
  690. }
  691. }
  692. }
  693. Ok(SyncLogEntryResult::Archived(committed)) => {
  694. if prev_log_index >= committed.index {
  695. eprintln!(
  696. "Peer {} misbehaves: send prev log index {}, got committed {:?}",
  697. peer_index, prev_log_index, committed
  698. );
  699. }
  700. let mut rf = rf.lock();
  701. Self::check_committed(&rf, peer, committed.clone());
  702. rf.current_step[peer_index] = 0;
  703. rf.next_index[peer_index] = committed.index;
  704. // Ignore the error. The log syncing thread must have died.
  705. let _ = rerun.send(Some(Peer(peer_index)));
  706. }
  707. Ok(SyncLogEntryResult::Diverged(committed)) => {
  708. if prev_log_index < committed.index {
  709. eprintln!(
  710. "Peer {} misbehaves: diverged at {}, but committed {:?}",
  711. peer_index, prev_log_index, committed
  712. );
  713. }
  714. let mut rf = rf.lock();
  715. Self::check_committed(&rf, peer, committed.clone());
  716. let step = &mut rf.current_step[peer_index];
  717. if *step < 5 {
  718. *step += 1;
  719. }
  720. let diff = 4 << *step;
  721. let next_index = &mut rf.next_index[peer_index];
  722. if diff >= *next_index {
  723. *next_index = 1usize;
  724. } else {
  725. *next_index -= diff;
  726. }
  727. if *next_index < committed.index {
  728. *next_index = committed.index;
  729. }
  730. // Ignore the error. The log syncing thread must have died.
  731. let _ = rerun.send(Some(Peer(peer_index)));
  732. }
  733. // Do nothing, not our term anymore.
  734. Ok(SyncLogEntryResult::TermElapsed(_)) => {}
  735. Err(_) => {
  736. tokio::time::sleep(Duration::from_millis(
  737. HEARTBEAT_INTERVAL_MILLIS,
  738. ))
  739. .await;
  740. // Ignore the error. The log syncing thread must have died.
  741. let _ = rerun.send(Some(Peer(peer_index)));
  742. }
  743. };
  744. }
  745. fn check_committed(
  746. rf: &RaftState<Command>,
  747. peer: Peer,
  748. committed: IndexTerm,
  749. ) {
  750. if committed.index < rf.log.start() {
  751. return;
  752. }
  753. let local_term = rf.log.at(committed.index).term;
  754. if committed.term != local_term {
  755. eprintln!(
  756. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  757. peer, committed.index, committed.term, local_term
  758. );
  759. }
  760. }
  761. fn build_sync_log_entry(
  762. rf: &Mutex<RaftState<Command>>,
  763. peer_index: usize,
  764. ) -> SyncLogEntryOperation<Command> {
  765. let rf = rf.lock();
  766. if !rf.is_leader() {
  767. return SyncLogEntryOperation::None;
  768. }
  769. // To send AppendEntries request, next_index must be strictly larger
  770. // than start(). Otherwise we won't be able to know the log term of the
  771. // entry right before next_index.
  772. if rf.next_index[peer_index] > rf.log.start() {
  773. SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
  774. &rf, peer_index,
  775. ))
  776. } else {
  777. SyncLogEntryOperation::InstallSnapshot(
  778. Self::build_install_snapshot(&rf),
  779. )
  780. }
  781. }
  782. fn build_append_entries(
  783. rf: &RaftState<Command>,
  784. peer_index: usize,
  785. ) -> AppendEntriesArgs<Command> {
  786. let prev_log_index = rf.next_index[peer_index] - 1;
  787. let prev_log_term = rf.log[prev_log_index].term;
  788. AppendEntriesArgs {
  789. term: rf.current_term,
  790. leader_id: rf.leader_id,
  791. prev_log_index,
  792. prev_log_term,
  793. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  794. leader_commit: rf.commit_index,
  795. }
  796. }
  797. const APPEND_ENTRIES_RETRY: usize = 1;
  798. async fn append_entries(
  799. rpc_client: &RpcClient,
  800. args: AppendEntriesArgs<Command>,
  801. ) -> std::io::Result<SyncLogEntryResult> {
  802. let term = args.term;
  803. let reply = retry_rpc(
  804. Self::APPEND_ENTRIES_RETRY,
  805. RPC_DEADLINE,
  806. move |_round| rpc_client.call_append_entries(args.clone()),
  807. )
  808. .await?;
  809. Ok(if reply.term == term {
  810. if let Some(committed) = reply.committed {
  811. if reply.success {
  812. SyncLogEntryResult::Archived(committed)
  813. } else {
  814. SyncLogEntryResult::Diverged(committed)
  815. }
  816. } else {
  817. SyncLogEntryResult::Success
  818. }
  819. } else {
  820. SyncLogEntryResult::TermElapsed(reply.term)
  821. })
  822. }
  823. pub fn start(&self, command: Command) -> Option<(Term, Index)> {
  824. let mut rf = self.inner_state.lock();
  825. let term = rf.current_term;
  826. if !rf.is_leader() {
  827. return None;
  828. }
  829. let index = rf.log.add_command(term, command);
  830. self.persister.save_state(rf.persisted_state().into());
  831. let _ = self.new_log_entry.clone().unwrap().send(None);
  832. Some((term, index))
  833. }
  834. pub fn kill(mut self) {
  835. self.keep_running.store(false, Ordering::SeqCst);
  836. self.election.stop_election_timer();
  837. self.new_log_entry.take().map(|n| n.send(None));
  838. self.apply_command_signal.notify_all();
  839. self.snapshot_daemon.kill();
  840. self.stop_wait_group.wait();
  841. std::sync::Arc::try_unwrap(self.thread_pool)
  842. .expect(
  843. "All references to the thread pool should have been dropped.",
  844. )
  845. .shutdown_timeout(Duration::from_millis(
  846. HEARTBEAT_INTERVAL_MILLIS * 2,
  847. ));
  848. }
  849. pub fn get_state(&self) -> (Term, bool) {
  850. let state = self.inner_state.lock();
  851. (state.current_term, state.is_leader())
  852. }
  853. }
  854. pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
  855. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
  856. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
  857. const RPC_DEADLINE: Duration = Duration::from_secs(2);
  858. impl ElectionState {
  859. fn reset_election_timer(&self) {
  860. let mut guard = self.timer.lock();
  861. guard.0 += 1;
  862. guard.1.replace(Self::election_timeout());
  863. self.signal.notify_one();
  864. }
  865. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  866. let mut guard = self.timer.lock();
  867. if guard.0 != timer_count {
  868. return false;
  869. }
  870. guard.0 += 1;
  871. guard.1.replace(Self::election_timeout());
  872. self.signal.notify_one();
  873. true
  874. }
  875. fn election_timeout() -> Instant {
  876. Instant::now()
  877. + Duration::from_millis(
  878. ELECTION_TIMEOUT_BASE_MILLIS
  879. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  880. )
  881. }
  882. fn stop_election_timer(&self) {
  883. let mut guard = self.timer.lock();
  884. guard.0 += 1;
  885. guard.1.take();
  886. self.signal.notify_one();
  887. }
  888. }
  889. impl<C> Raft<C> {
  890. pub const NO_SNAPSHOT: fn(Index) -> Snapshot = |index| Snapshot {
  891. last_included_index: index,
  892. data: vec![],
  893. };
  894. }