lib.rs 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. extern crate bincode;
  2. extern crate futures_channel;
  3. extern crate futures_util;
  4. extern crate labrpc;
  5. extern crate rand;
  6. #[macro_use]
  7. extern crate serde_derive;
  8. extern crate tokio;
  9. use std::convert::TryFrom;
  10. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  11. use std::sync::Arc;
  12. use std::time::{Duration, Instant};
  13. use crossbeam_utils::sync::WaitGroup;
  14. use parking_lot::{Condvar, Mutex};
  15. use rand::{thread_rng, Rng};
  16. use crate::apply_command::ApplyCommandFnMut;
  17. pub use crate::apply_command::ApplyCommandMessage;
  18. use crate::daemon_env::{DaemonEnv, ErrorKind, ThreadEnv};
  19. use crate::index_term::IndexTerm;
  20. use crate::install_snapshot::InstallSnapshotArgs;
  21. use crate::persister::PersistedRaftState;
  22. pub use crate::persister::Persister;
  23. pub(crate) use crate::raft_state::RaftState;
  24. pub(crate) use crate::raft_state::State;
  25. pub use crate::rpcs::RpcClient;
  26. pub use crate::snapshot::Snapshot;
  27. use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
  28. use crate::utils::retry_rpc;
  29. mod apply_command;
  30. mod daemon_env;
  31. mod index_term;
  32. mod install_snapshot;
  33. mod log_array;
  34. mod persister;
  35. mod raft_state;
  36. pub mod rpcs;
  37. mod snapshot;
  38. pub mod utils;
  39. #[derive(
  40. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  41. )]
  42. pub struct Term(pub usize);
  43. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  44. struct Peer(usize);
  45. pub type Index = usize;
  46. #[derive(Clone, Debug, Serialize, Deserialize)]
  47. struct LogEntry<Command> {
  48. index: Index,
  49. term: Term,
  50. command: Command,
  51. }
  52. struct ElectionState {
  53. // Timer will be removed upon shutdown or elected.
  54. timer: Mutex<(usize, Option<Instant>)>,
  55. // Wake up the timer thread when the timer is reset or cancelled.
  56. signal: Condvar,
  57. }
  58. #[derive(Clone)]
  59. pub struct Raft<Command> {
  60. inner_state: Arc<Mutex<RaftState<Command>>>,
  61. peers: Vec<Arc<RpcClient>>,
  62. me: Peer,
  63. persister: Arc<dyn Persister>,
  64. new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
  65. apply_command_signal: Arc<Condvar>,
  66. keep_running: Arc<AtomicBool>,
  67. election: Arc<ElectionState>,
  68. snapshot_daemon: SnapshotDaemon,
  69. thread_pool: Arc<tokio::runtime::Runtime>,
  70. daemon_env: DaemonEnv,
  71. stop_wait_group: WaitGroup,
  72. }
  73. #[derive(Clone, Debug, Serialize, Deserialize)]
  74. struct RequestVoteArgs {
  75. term: Term,
  76. candidate_id: Peer,
  77. last_log_index: Index,
  78. last_log_term: Term,
  79. }
  80. #[derive(Clone, Debug, Serialize, Deserialize)]
  81. struct RequestVoteReply {
  82. term: Term,
  83. vote_granted: bool,
  84. }
  85. #[derive(Clone, Debug, Serialize, Deserialize)]
  86. struct AppendEntriesArgs<Command> {
  87. term: Term,
  88. leader_id: Peer,
  89. prev_log_index: Index,
  90. prev_log_term: Term,
  91. entries: Vec<LogEntry<Command>>,
  92. leader_commit: Index,
  93. }
  94. #[derive(Clone, Debug, Serialize, Deserialize)]
  95. struct AppendEntriesReply {
  96. term: Term,
  97. success: bool,
  98. committed: Option<IndexTerm>,
  99. }
  100. #[repr(align(64))]
  101. struct Opening(Arc<AtomicUsize>);
  102. // Commands must be
  103. // 0. 'static: they have to live long enough for thread pools.
  104. // 1. clone: they are put in vectors and request messages.
  105. // 2. serializable: they are sent over RPCs and persisted.
  106. // 3. deserializable: they are restored from storage.
  107. // 4. send: they are referenced in futures.
  108. // 5. default, because we need an element for the first entry.
  109. impl<Command> Raft<Command>
  110. where
  111. Command: 'static
  112. + Clone
  113. + serde::Serialize
  114. + serde::de::DeserializeOwned
  115. + Send
  116. + Default,
  117. {
  118. /// Create a new raft instance.
  119. ///
  120. /// Each instance will create at least 3 + (number of peers) threads. The
  121. /// extensive usage of threads is to minimize latency.
  122. pub fn new(
  123. peers: Vec<RpcClient>,
  124. me: usize,
  125. persister: Arc<dyn Persister>,
  126. apply_command: impl ApplyCommandFnMut<Command>,
  127. max_state_size_bytes: Option<usize>,
  128. request_snapshot: impl RequestSnapshotFnMut,
  129. ) -> Self {
  130. let peer_size = peers.len();
  131. let mut state = RaftState {
  132. current_term: Term(0),
  133. voted_for: None,
  134. log: log_array::LogArray::create(),
  135. commit_index: 0,
  136. last_applied: 0,
  137. next_index: vec![1; peer_size],
  138. match_index: vec![0; peer_size],
  139. current_step: vec![0; peer_size],
  140. state: State::Follower,
  141. leader_id: Peer(me),
  142. };
  143. if let Ok(persisted_state) =
  144. PersistedRaftState::try_from(persister.read_state())
  145. {
  146. state.current_term = persisted_state.current_term;
  147. state.voted_for = persisted_state.voted_for;
  148. state.log = persisted_state.log;
  149. state.commit_index = state.log.start();
  150. }
  151. let election = ElectionState {
  152. timer: Mutex::new((0, None)),
  153. signal: Condvar::new(),
  154. };
  155. election.reset_election_timer();
  156. let daemon_env = DaemonEnv::create();
  157. let thread_env = daemon_env.for_thread();
  158. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  159. .enable_time()
  160. .thread_name(format!("raft-instance-{}", me))
  161. .worker_threads(peer_size)
  162. .on_thread_start(move || thread_env.clone().attach())
  163. .on_thread_stop(ThreadEnv::detach)
  164. .build()
  165. .expect("Creating thread pool should not fail");
  166. let peers = peers.into_iter().map(Arc::new).collect();
  167. let mut this = Raft {
  168. inner_state: Arc::new(Mutex::new(state)),
  169. peers,
  170. me: Peer(me),
  171. persister,
  172. new_log_entry: None,
  173. apply_command_signal: Arc::new(Default::default()),
  174. keep_running: Arc::new(Default::default()),
  175. election: Arc::new(election),
  176. snapshot_daemon: Default::default(),
  177. thread_pool: Arc::new(thread_pool),
  178. daemon_env,
  179. stop_wait_group: WaitGroup::new(),
  180. };
  181. this.keep_running.store(true, Ordering::SeqCst);
  182. // Running in a standalone thread.
  183. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
  184. // Running in a standalone thread.
  185. this.run_log_entry_daemon();
  186. // Running in a standalone thread.
  187. this.run_apply_command_daemon(apply_command);
  188. // One off function that schedules many little tasks, running on the
  189. // internal thread pool.
  190. this.schedule_heartbeats(Duration::from_millis(
  191. HEARTBEAT_INTERVAL_MILLIS,
  192. ));
  193. // The last step is to start running election timer.
  194. this.run_election_timer();
  195. this
  196. }
  197. }
  198. // Command must be
  199. // 1. clone: they are copied to the persister.
  200. // 2. serialize: they are converted to bytes to persist.
  201. // 3. default: a default value is used as the first element of the log.
  202. impl<Command> Raft<Command>
  203. where
  204. Command: Clone + serde::Serialize + Default,
  205. {
  206. pub(crate) fn process_request_vote(
  207. &self,
  208. args: RequestVoteArgs,
  209. ) -> RequestVoteReply {
  210. // Note: do not change this to `let _ = ...`.
  211. let _guard = self.daemon_env.for_scope();
  212. let mut rf = self.inner_state.lock();
  213. let term = rf.current_term;
  214. #[allow(clippy::comparison_chain)]
  215. if args.term < term {
  216. return RequestVoteReply {
  217. term,
  218. vote_granted: false,
  219. };
  220. } else if args.term > term {
  221. rf.current_term = args.term;
  222. rf.voted_for = None;
  223. rf.state = State::Follower;
  224. self.election.reset_election_timer();
  225. self.persister.save_state(rf.persisted_state().into());
  226. }
  227. let voted_for = rf.voted_for;
  228. let last_log = rf.log.last_index_term();
  229. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  230. && (args.last_log_term > last_log.term
  231. || (args.last_log_term == last_log.term
  232. && args.last_log_index >= last_log.index))
  233. {
  234. rf.voted_for = Some(args.candidate_id);
  235. // It is possible that we have set a timer above when updating the
  236. // current term. It does not hurt to update the timer again.
  237. // We do need to persist, though.
  238. self.election.reset_election_timer();
  239. self.persister.save_state(rf.persisted_state().into());
  240. RequestVoteReply {
  241. term: args.term,
  242. vote_granted: true,
  243. }
  244. } else {
  245. RequestVoteReply {
  246. term: args.term,
  247. vote_granted: false,
  248. }
  249. }
  250. }
  251. pub(crate) fn process_append_entries(
  252. &self,
  253. args: AppendEntriesArgs<Command>,
  254. ) -> AppendEntriesReply {
  255. // Note: do not change this to `let _ = ...`.
  256. let _guard = self.daemon_env.for_scope();
  257. let mut rf = self.inner_state.lock();
  258. if rf.current_term > args.term {
  259. return AppendEntriesReply {
  260. term: rf.current_term,
  261. success: false,
  262. committed: Some(rf.log.first_after(rf.commit_index).into()),
  263. };
  264. }
  265. if rf.current_term < args.term {
  266. rf.current_term = args.term;
  267. rf.voted_for = None;
  268. self.persister.save_state(rf.persisted_state().into());
  269. }
  270. rf.state = State::Follower;
  271. rf.leader_id = args.leader_id;
  272. self.election.reset_election_timer();
  273. if rf.log.start() > args.prev_log_index
  274. || rf.log.end() <= args.prev_log_index
  275. || rf.log[args.prev_log_index].term != args.prev_log_term
  276. {
  277. return AppendEntriesReply {
  278. term: args.term,
  279. success: args.prev_log_index < rf.log.start(),
  280. committed: Some(rf.log.first_after(rf.commit_index).into()),
  281. };
  282. }
  283. for (i, entry) in args.entries.iter().enumerate() {
  284. let index = i + args.prev_log_index + 1;
  285. if rf.log.end() > index {
  286. if rf.log[index].term != entry.term {
  287. check_or_record!(
  288. index > rf.commit_index,
  289. ErrorKind::RollbackCommitted(index),
  290. "Entries before commit index should never be rolled back",
  291. &rf
  292. );
  293. rf.log.truncate(index);
  294. rf.log.push(entry.clone());
  295. }
  296. } else {
  297. rf.log.push(entry.clone());
  298. }
  299. }
  300. self.persister.save_state(rf.persisted_state().into());
  301. if args.leader_commit > rf.commit_index {
  302. rf.commit_index = if args.leader_commit < rf.log.end() {
  303. args.leader_commit
  304. } else {
  305. rf.log.last_index_term().index
  306. };
  307. self.apply_command_signal.notify_one();
  308. }
  309. self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
  310. AppendEntriesReply {
  311. term: args.term,
  312. success: true,
  313. committed: None,
  314. }
  315. }
  316. }
  317. enum SyncLogEntryOperation<Command> {
  318. AppendEntries(AppendEntriesArgs<Command>),
  319. InstallSnapshot(InstallSnapshotArgs),
  320. None,
  321. }
  322. enum SyncLogEntryResult {
  323. TermElapsed(Term),
  324. Archived(IndexTerm),
  325. Diverged(IndexTerm),
  326. Success,
  327. }
  328. // Command must be
  329. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  330. // 1. clone: they are copied to the persister.
  331. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  332. // 3. serialize: they are converted to bytes to persist.
  333. // 4. default: a default value is used as the first element of log.
  334. impl<Command> Raft<Command>
  335. where
  336. Command: 'static + Clone + Send + serde::Serialize + Default,
  337. {
  338. fn run_election_timer(&self) {
  339. let this = self.clone();
  340. let join_handle = std::thread::spawn(move || {
  341. // Note: do not change this to `let _ = ...`.
  342. let _guard = this.daemon_env.for_scope();
  343. let election = this.election.clone();
  344. let mut should_run = None;
  345. while this.keep_running.load(Ordering::SeqCst) {
  346. let mut cancel_handle = should_run
  347. .map(|last_timer_count| this.run_election(last_timer_count))
  348. .flatten();
  349. let mut guard = election.timer.lock();
  350. let (timer_count, deadline) = *guard;
  351. if let Some(last_timer_count) = should_run {
  352. // If the timer was changed more than once, we know the
  353. // last scheduled election should have been cancelled.
  354. if timer_count > last_timer_count + 1 {
  355. cancel_handle.take().map(|c| c.send(()));
  356. }
  357. }
  358. // check the running signal before sleeping. We are holding the
  359. // timer lock, so no one can change it. The kill() method will
  360. // not be able to notify this thread before `wait` is called.
  361. if !this.keep_running.load(Ordering::SeqCst) {
  362. break;
  363. }
  364. should_run = match deadline {
  365. Some(timeout) => loop {
  366. let ret =
  367. election.signal.wait_until(&mut guard, timeout);
  368. let fired = ret.timed_out() && Instant::now() > timeout;
  369. // If the timer has been updated, do not schedule,
  370. // break so that we could cancel.
  371. if timer_count != guard.0 {
  372. // Timer has been updated, cancel current
  373. // election, and block on timeout again.
  374. break None;
  375. } else if fired {
  376. // Timer has fired, remove the timer and allow
  377. // running the next election at timer_count.
  378. // If the next election is cancelled before we
  379. // are back on wait, timer_count will be set to
  380. // a different value.
  381. guard.0 += 1;
  382. guard.1.take();
  383. break Some(guard.0);
  384. }
  385. },
  386. None => {
  387. election.signal.wait(&mut guard);
  388. // The timeout has changed, check again.
  389. None
  390. }
  391. };
  392. drop(guard);
  393. // Whenever woken up, cancel the current running election.
  394. // There are 3 cases we could reach here
  395. // 1. We received an AppendEntries, or decided to vote for
  396. // a peer, and thus turned into a follower. In this case we'll
  397. // be notified by the election signal.
  398. // 2. We are a follower but didn't receive a heartbeat on time,
  399. // or we are a candidate but didn't not collect enough vote on
  400. // time. In this case we'll have a timeout.
  401. // 3. When become a leader, or are shutdown. In this case we'll
  402. // be notified by the election signal.
  403. cancel_handle.map(|c| c.send(()));
  404. }
  405. let stop_wait_group = this.stop_wait_group.clone();
  406. // Making sure the rest of `this` is dropped before the wait group.
  407. drop(this);
  408. drop(stop_wait_group);
  409. });
  410. self.daemon_env.watch_daemon(join_handle);
  411. }
  412. fn run_election(
  413. &self,
  414. timer_count: usize,
  415. ) -> Option<futures_channel::oneshot::Sender<()>> {
  416. let me = self.me;
  417. let (term, args) = {
  418. let mut rf = self.inner_state.lock();
  419. // The previous election is faster and reached the critical section
  420. // before us. We should stop and not run this election.
  421. // Or someone else increased the term and the timer is reset.
  422. if !self.election.try_reset_election_timer(timer_count) {
  423. return None;
  424. }
  425. rf.current_term.0 += 1;
  426. rf.voted_for = Some(me);
  427. rf.state = State::Candidate;
  428. self.persister.save_state(rf.persisted_state().into());
  429. let term = rf.current_term;
  430. let (last_log_index, last_log_term) =
  431. rf.log.last_index_term().unpack();
  432. (
  433. term,
  434. RequestVoteArgs {
  435. term,
  436. candidate_id: me,
  437. last_log_index,
  438. last_log_term,
  439. },
  440. )
  441. };
  442. let mut votes = vec![];
  443. for (index, rpc_client) in self.peers.iter().enumerate() {
  444. if index != self.me.0 {
  445. // RpcClient must be cloned so that it lives long enough for
  446. // spawn(), which requires static life time.
  447. let rpc_client = rpc_client.clone();
  448. // RPCs are started right away.
  449. let one_vote = self
  450. .thread_pool
  451. .spawn(Self::request_vote(rpc_client, args.clone()));
  452. votes.push(one_vote);
  453. }
  454. }
  455. let (tx, rx) = futures_channel::oneshot::channel();
  456. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  457. me,
  458. term,
  459. self.inner_state.clone(),
  460. votes,
  461. rx,
  462. self.election.clone(),
  463. self.new_log_entry.clone().unwrap(),
  464. ));
  465. Some(tx)
  466. }
  467. const REQUEST_VOTE_RETRY: usize = 1;
  468. async fn request_vote(
  469. rpc_client: Arc<RpcClient>,
  470. args: RequestVoteArgs,
  471. ) -> Option<bool> {
  472. let term = args.term;
  473. // See the comment in send_heartbeat() for this override.
  474. let rpc_client = rpc_client.as_ref();
  475. let reply =
  476. retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
  477. rpc_client.call_request_vote(args.clone())
  478. })
  479. .await;
  480. if let Ok(reply) = reply {
  481. return Some(reply.vote_granted && reply.term == term);
  482. }
  483. None
  484. }
  485. async fn count_vote_util_cancelled(
  486. me: Peer,
  487. term: Term,
  488. rf: Arc<Mutex<RaftState<Command>>>,
  489. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  490. cancel_token: futures_channel::oneshot::Receiver<()>,
  491. election: Arc<ElectionState>,
  492. new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
  493. ) {
  494. let quorum = votes.len() >> 1;
  495. let mut vote_count = 0;
  496. let mut against_count = 0;
  497. let mut cancel_token = cancel_token;
  498. let mut futures_vec = votes;
  499. while vote_count < quorum
  500. && against_count <= quorum
  501. && !futures_vec.is_empty()
  502. {
  503. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  504. let selected = futures_util::future::select(
  505. cancel_token,
  506. futures_util::future::select_all(futures_vec),
  507. )
  508. .await;
  509. let ((one_vote, _, rest), new_token) = match selected {
  510. futures_util::future::Either::Left(_) => break,
  511. futures_util::future::Either::Right(tuple) => tuple,
  512. };
  513. futures_vec = rest;
  514. cancel_token = new_token;
  515. if let Ok(Some(vote)) = one_vote {
  516. if vote {
  517. vote_count += 1
  518. } else {
  519. against_count += 1
  520. }
  521. }
  522. }
  523. if vote_count < quorum {
  524. return;
  525. }
  526. let mut rf = rf.lock();
  527. if rf.current_term == term && rf.state == State::Candidate {
  528. // We are the leader now. The election timer can be stopped.
  529. election.stop_election_timer();
  530. rf.state = State::Leader;
  531. rf.leader_id = me;
  532. let log_len = rf.log.end();
  533. for item in rf.next_index.iter_mut() {
  534. *item = log_len;
  535. }
  536. for item in rf.match_index.iter_mut() {
  537. *item = 0;
  538. }
  539. for item in rf.current_step.iter_mut() {
  540. *item = 0;
  541. }
  542. // Sync all logs now.
  543. let _ = new_log_entry.send(None);
  544. }
  545. }
  546. fn schedule_heartbeats(&self, interval: Duration) {
  547. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  548. if peer_index != self.me.0 {
  549. // rf is now owned by the outer async function.
  550. let rf = self.inner_state.clone();
  551. // RPC client must be cloned into the outer async function.
  552. let rpc_client = rpc_client.clone();
  553. // Shutdown signal.
  554. let keep_running = self.keep_running.clone();
  555. self.thread_pool.spawn(async move {
  556. let mut interval = tokio::time::interval(interval);
  557. while keep_running.load(Ordering::SeqCst) {
  558. interval.tick().await;
  559. if let Some(args) = Self::build_heartbeat(&rf) {
  560. tokio::spawn(Self::send_heartbeat(
  561. rpc_client.clone(),
  562. args,
  563. ));
  564. }
  565. }
  566. });
  567. }
  568. }
  569. }
  570. fn build_heartbeat(
  571. rf: &Mutex<RaftState<Command>>,
  572. ) -> Option<AppendEntriesArgs<Command>> {
  573. let rf = rf.lock();
  574. if !rf.is_leader() {
  575. return None;
  576. }
  577. let last_log = rf.log.last_index_term();
  578. let args = AppendEntriesArgs {
  579. term: rf.current_term,
  580. leader_id: rf.leader_id,
  581. prev_log_index: last_log.index,
  582. prev_log_term: last_log.term,
  583. entries: vec![],
  584. leader_commit: rf.commit_index,
  585. };
  586. Some(args)
  587. }
  588. const HEARTBEAT_RETRY: usize = 1;
  589. async fn send_heartbeat(
  590. rpc_client: Arc<RpcClient>,
  591. args: AppendEntriesArgs<Command>,
  592. ) -> std::io::Result<()> {
  593. // Passing a reference that is moved to the following closure.
  594. //
  595. // It won't work if the rpc_client of type Arc is moved into the closure
  596. // directly. To clone the Arc, the function must own a mutable reference
  597. // to it. At the same time, rpc_client.call_append_entries() returns a
  598. // future that must own a reference, too. That caused a compiling error
  599. // of FnMut allowing "references to captured variables to escape".
  600. //
  601. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  602. // (no Mut), which can allow references to escape.
  603. //
  604. // Another option is to use non-move closures, in which case rpc_client
  605. // of type Arc can be passed-in directly. However that requires args to
  606. // be sync because they can be shared by more than one futures.
  607. let rpc_client = rpc_client.as_ref();
  608. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  609. rpc_client.call_append_entries(args.clone())
  610. })
  611. .await?;
  612. Ok(())
  613. }
  614. fn run_log_entry_daemon(&mut self) {
  615. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  616. self.new_log_entry.replace(tx);
  617. // Clone everything that the thread needs.
  618. let this = self.clone();
  619. let join_handle = std::thread::spawn(move || {
  620. // Note: do not change this to `let _ = ...`.
  621. let _guard = this.daemon_env.for_scope();
  622. let mut openings = vec![];
  623. openings.resize_with(this.peers.len(), || {
  624. Opening(Arc::new(AtomicUsize::new(0)))
  625. });
  626. let openings = openings; // Not mutable beyond this point.
  627. while let Ok(peer) = rx.recv() {
  628. if !this.keep_running.load(Ordering::SeqCst) {
  629. break;
  630. }
  631. if !this.inner_state.lock().is_leader() {
  632. continue;
  633. }
  634. for (i, rpc_client) in this.peers.iter().enumerate() {
  635. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  636. {
  637. // Only schedule a new task if the last task has cleared
  638. // the queue of RPC requests.
  639. if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
  640. this.thread_pool.spawn(Self::sync_log_entry(
  641. this.inner_state.clone(),
  642. rpc_client.clone(),
  643. i,
  644. this.new_log_entry.clone().unwrap(),
  645. openings[i].0.clone(),
  646. this.apply_command_signal.clone(),
  647. ));
  648. }
  649. }
  650. }
  651. }
  652. let stop_wait_group = this.stop_wait_group.clone();
  653. // Making sure the rest of `this` is dropped before the wait group.
  654. drop(this);
  655. drop(stop_wait_group);
  656. });
  657. self.daemon_env.watch_daemon(join_handle);
  658. }
  659. async fn sync_log_entry(
  660. rf: Arc<Mutex<RaftState<Command>>>,
  661. rpc_client: Arc<RpcClient>,
  662. peer_index: usize,
  663. rerun: std::sync::mpsc::Sender<Option<Peer>>,
  664. opening: Arc<AtomicUsize>,
  665. apply_command_signal: Arc<Condvar>,
  666. ) {
  667. if opening.swap(0, Ordering::SeqCst) == 0 {
  668. return;
  669. }
  670. let operation = Self::build_sync_log_entry(&rf, peer_index);
  671. let (term, prev_log_index, match_index, succeeded) = match operation {
  672. SyncLogEntryOperation::AppendEntries(args) => {
  673. let term = args.term;
  674. let prev_log_index = args.prev_log_index;
  675. let match_index = args.prev_log_index + args.entries.len();
  676. let succeeded = Self::append_entries(&rpc_client, args).await;
  677. (term, prev_log_index, match_index, succeeded)
  678. }
  679. SyncLogEntryOperation::InstallSnapshot(args) => {
  680. let term = args.term;
  681. let prev_log_index = args.last_included_index;
  682. let match_index = args.last_included_index;
  683. let succeeded =
  684. Self::send_install_snapshot(&rpc_client, args).await;
  685. (term, prev_log_index, match_index, succeeded)
  686. }
  687. SyncLogEntryOperation::None => return,
  688. };
  689. let peer = Peer(peer_index);
  690. match succeeded {
  691. Ok(SyncLogEntryResult::Success) => {
  692. let mut rf = rf.lock();
  693. if rf.current_term != term {
  694. return;
  695. }
  696. rf.next_index[peer_index] = match_index + 1;
  697. rf.current_step[peer_index] = 0;
  698. if match_index > rf.match_index[peer_index] {
  699. rf.match_index[peer_index] = match_index;
  700. if rf.is_leader() && rf.current_term == term {
  701. let mut matched = rf.match_index.to_vec();
  702. let mid = matched.len() / 2 + 1;
  703. matched.sort_unstable();
  704. let new_commit_index = matched[mid];
  705. if new_commit_index > rf.commit_index
  706. && rf.log[new_commit_index].term == rf.current_term
  707. {
  708. rf.commit_index = new_commit_index;
  709. apply_command_signal.notify_one();
  710. }
  711. }
  712. }
  713. }
  714. Ok(SyncLogEntryResult::Archived(committed)) => {
  715. if prev_log_index >= committed.index {
  716. eprintln!(
  717. "Peer {} misbehaves: send prev log index {}, got committed {:?}",
  718. peer_index, prev_log_index, committed
  719. );
  720. }
  721. let mut rf = rf.lock();
  722. Self::check_committed(&rf, peer, committed.clone());
  723. rf.current_step[peer_index] = 0;
  724. rf.next_index[peer_index] = committed.index;
  725. // Ignore the error. The log syncing thread must have died.
  726. let _ = rerun.send(Some(Peer(peer_index)));
  727. }
  728. Ok(SyncLogEntryResult::Diverged(committed)) => {
  729. if prev_log_index < committed.index {
  730. eprintln!(
  731. "Peer {} misbehaves: diverged at {}, but committed {:?}",
  732. peer_index, prev_log_index, committed
  733. );
  734. }
  735. let mut rf = rf.lock();
  736. Self::check_committed(&rf, peer, committed.clone());
  737. let step = &mut rf.current_step[peer_index];
  738. if *step < 5 {
  739. *step += 1;
  740. }
  741. let diff = 4 << *step;
  742. let next_index = &mut rf.next_index[peer_index];
  743. if diff >= *next_index {
  744. *next_index = 1usize;
  745. } else {
  746. *next_index -= diff;
  747. }
  748. if *next_index < committed.index {
  749. *next_index = committed.index;
  750. }
  751. // Ignore the error. The log syncing thread must have died.
  752. let _ = rerun.send(Some(Peer(peer_index)));
  753. }
  754. // Do nothing, not our term anymore.
  755. Ok(SyncLogEntryResult::TermElapsed(_)) => {}
  756. Err(_) => {
  757. tokio::time::sleep(Duration::from_millis(
  758. HEARTBEAT_INTERVAL_MILLIS,
  759. ))
  760. .await;
  761. // Ignore the error. The log syncing thread must have died.
  762. let _ = rerun.send(Some(Peer(peer_index)));
  763. }
  764. };
  765. }
  766. fn check_committed(
  767. rf: &RaftState<Command>,
  768. peer: Peer,
  769. committed: IndexTerm,
  770. ) {
  771. if committed.index < rf.log.start() {
  772. return;
  773. }
  774. let local_term = rf.log.at(committed.index).term;
  775. if committed.term != local_term {
  776. eprintln!(
  777. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  778. peer, committed.index, committed.term, local_term
  779. );
  780. }
  781. }
  782. fn build_sync_log_entry(
  783. rf: &Mutex<RaftState<Command>>,
  784. peer_index: usize,
  785. ) -> SyncLogEntryOperation<Command> {
  786. let rf = rf.lock();
  787. if !rf.is_leader() {
  788. return SyncLogEntryOperation::None;
  789. }
  790. // To send AppendEntries request, next_index must be strictly larger
  791. // than start(). Otherwise we won't be able to know the log term of the
  792. // entry right before next_index.
  793. if rf.next_index[peer_index] > rf.log.start() {
  794. SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
  795. &rf, peer_index,
  796. ))
  797. } else {
  798. SyncLogEntryOperation::InstallSnapshot(
  799. Self::build_install_snapshot(&rf),
  800. )
  801. }
  802. }
  803. fn build_append_entries(
  804. rf: &RaftState<Command>,
  805. peer_index: usize,
  806. ) -> AppendEntriesArgs<Command> {
  807. let prev_log_index = rf.next_index[peer_index] - 1;
  808. let prev_log_term = rf.log[prev_log_index].term;
  809. AppendEntriesArgs {
  810. term: rf.current_term,
  811. leader_id: rf.leader_id,
  812. prev_log_index,
  813. prev_log_term,
  814. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  815. leader_commit: rf.commit_index,
  816. }
  817. }
  818. const APPEND_ENTRIES_RETRY: usize = 1;
  819. async fn append_entries(
  820. rpc_client: &RpcClient,
  821. args: AppendEntriesArgs<Command>,
  822. ) -> std::io::Result<SyncLogEntryResult> {
  823. let term = args.term;
  824. let reply = retry_rpc(
  825. Self::APPEND_ENTRIES_RETRY,
  826. RPC_DEADLINE,
  827. move |_round| rpc_client.call_append_entries(args.clone()),
  828. )
  829. .await?;
  830. Ok(if reply.term == term {
  831. if let Some(committed) = reply.committed {
  832. if reply.success {
  833. SyncLogEntryResult::Archived(committed)
  834. } else {
  835. SyncLogEntryResult::Diverged(committed)
  836. }
  837. } else {
  838. SyncLogEntryResult::Success
  839. }
  840. } else {
  841. SyncLogEntryResult::TermElapsed(reply.term)
  842. })
  843. }
  844. pub fn start(&self, command: Command) -> Option<(Term, Index)> {
  845. let mut rf = self.inner_state.lock();
  846. let term = rf.current_term;
  847. if !rf.is_leader() {
  848. return None;
  849. }
  850. let index = rf.log.add_command(term, command);
  851. self.persister.save_state(rf.persisted_state().into());
  852. let _ = self.new_log_entry.clone().unwrap().send(None);
  853. Some((term, index))
  854. }
  855. pub fn kill(mut self) {
  856. self.keep_running.store(false, Ordering::SeqCst);
  857. self.election.stop_election_timer();
  858. self.new_log_entry.take().map(|n| n.send(None));
  859. self.apply_command_signal.notify_all();
  860. self.snapshot_daemon.kill();
  861. self.stop_wait_group.wait();
  862. std::sync::Arc::try_unwrap(self.thread_pool)
  863. .expect(
  864. "All references to the thread pool should have been dropped.",
  865. )
  866. .shutdown_timeout(Duration::from_millis(
  867. HEARTBEAT_INTERVAL_MILLIS * 2,
  868. ));
  869. // DaemonEnv must be shutdown after the thread pool, since there might
  870. // be tasks logging errors in the pool.
  871. self.daemon_env.shutdown();
  872. }
  873. pub fn get_state(&self) -> (Term, bool) {
  874. let state = self.inner_state.lock();
  875. (state.current_term, state.is_leader())
  876. }
  877. }
  878. pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
  879. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
  880. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
  881. const RPC_DEADLINE: Duration = Duration::from_secs(2);
  882. impl ElectionState {
  883. fn reset_election_timer(&self) {
  884. let mut guard = self.timer.lock();
  885. guard.0 += 1;
  886. guard.1.replace(Self::election_timeout());
  887. self.signal.notify_one();
  888. }
  889. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  890. let mut guard = self.timer.lock();
  891. if guard.0 != timer_count {
  892. return false;
  893. }
  894. guard.0 += 1;
  895. guard.1.replace(Self::election_timeout());
  896. self.signal.notify_one();
  897. true
  898. }
  899. fn election_timeout() -> Instant {
  900. Instant::now()
  901. + Duration::from_millis(
  902. ELECTION_TIMEOUT_BASE_MILLIS
  903. + thread_rng().gen_range(0..ELECTION_TIMEOUT_VAR_MILLIS),
  904. )
  905. }
  906. fn stop_election_timer(&self) {
  907. let mut guard = self.timer.lock();
  908. guard.0 += 1;
  909. guard.1.take();
  910. self.signal.notify_one();
  911. }
  912. }
  913. impl<C> Raft<C> {
  914. pub const NO_SNAPSHOT: fn(Index) = |_| {};
  915. }