lib.rs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850
  1. extern crate bincode;
  2. extern crate futures;
  3. extern crate labrpc;
  4. extern crate rand;
  5. #[macro_use]
  6. extern crate serde_derive;
  7. extern crate tokio;
  8. use std::sync::atomic::{AtomicBool, Ordering};
  9. use std::sync::Arc;
  10. use std::time::{Duration, Instant};
  11. use parking_lot::{Condvar, Mutex};
  12. use rand::{thread_rng, Rng};
  13. pub use crate::rpcs::RpcClient;
  14. use crate::utils::{retry_rpc, DropGuard};
  15. use crossbeam_utils::sync::WaitGroup;
  16. pub mod rpcs;
  17. pub mod utils;
  18. #[derive(Debug, Eq, PartialEq)]
  19. enum State {
  20. Follower,
  21. Candidate,
  22. // TODO: add PreVote
  23. Leader,
  24. }
  25. #[derive(
  26. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  27. )]
  28. pub struct Term(pub usize);
  29. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  30. struct Peer(usize);
  31. pub type Index = usize;
  32. #[derive(Clone, Debug, Serialize, Deserialize)]
  33. pub struct Command(usize);
  34. #[derive(Clone, Debug, Serialize, Deserialize)]
  35. struct LogEntry {
  36. term: Term,
  37. index: Index,
  38. // TODO: Allow sending of arbitrary information.
  39. command: Command,
  40. }
  41. struct RaftState {
  42. current_term: Term,
  43. voted_for: Option<Peer>,
  44. log: Vec<LogEntry>,
  45. commit_index: Index,
  46. last_applied: Index,
  47. next_index: Vec<Index>,
  48. match_index: Vec<Index>,
  49. current_step: Vec<i64>,
  50. state: State,
  51. leader_id: Peer,
  52. }
  53. struct ElectionState {
  54. // Timer will be removed upon shutdown or elected.
  55. timer: Mutex<(usize, Option<Instant>)>,
  56. // Wake up the timer thread when the timer is reset or cancelled.
  57. signal: Condvar,
  58. }
  59. #[derive(Clone)]
  60. pub struct Raft {
  61. inner_state: Arc<Mutex<RaftState>>,
  62. peers: Vec<RpcClient>,
  63. me: Peer,
  64. new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
  65. apply_command_signal: Arc<Condvar>,
  66. keep_running: Arc<AtomicBool>,
  67. election: Arc<ElectionState>,
  68. thread_pool: Arc<tokio::runtime::Runtime>,
  69. stop_wait_group: WaitGroup,
  70. }
  71. #[derive(Clone, Debug, Serialize, Deserialize)]
  72. struct RequestVoteArgs {
  73. term: Term,
  74. candidate_id: Peer,
  75. last_log_index: Index,
  76. last_log_term: Term,
  77. }
  78. #[derive(Clone, Debug, Serialize, Deserialize)]
  79. struct RequestVoteReply {
  80. term: Term,
  81. vote_granted: bool,
  82. }
  83. #[derive(Clone, Debug, Serialize, Deserialize)]
  84. struct AppendEntriesArgs {
  85. term: Term,
  86. leader_id: Peer,
  87. prev_log_index: Index,
  88. prev_log_term: Term,
  89. entries: Vec<LogEntry>,
  90. leader_commit: Index,
  91. }
  92. #[derive(Clone, Debug, Serialize, Deserialize)]
  93. struct AppendEntriesReply {
  94. term: Term,
  95. success: bool,
  96. }
  97. impl Raft {
  98. pub fn new<Func>(
  99. peers: Vec<RpcClient>,
  100. me: usize,
  101. apply_command: Func,
  102. ) -> Self
  103. where
  104. Func: 'static + Send + FnMut(Index, Command),
  105. {
  106. let peer_size = peers.len();
  107. let state = RaftState {
  108. current_term: Term(0),
  109. voted_for: None,
  110. log: vec![LogEntry {
  111. term: Term(0),
  112. index: 0,
  113. command: Command(0),
  114. }],
  115. commit_index: 0,
  116. last_applied: 0,
  117. next_index: Vec::with_capacity(peer_size),
  118. match_index: Vec::with_capacity(peer_size),
  119. current_step: Vec::with_capacity(peer_size),
  120. state: State::Follower,
  121. leader_id: Peer(me),
  122. };
  123. let election = ElectionState {
  124. timer: Mutex::new((0, None)),
  125. signal: Condvar::new(),
  126. };
  127. election.reset_election_timer();
  128. let thread_pool = tokio::runtime::Builder::new()
  129. .threaded_scheduler()
  130. .enable_time()
  131. .thread_name(format!("raft-instance-{}", me))
  132. .core_threads(peer_size)
  133. .max_threads(peer_size * 2)
  134. .build()
  135. .expect("Creating thread pool should not fail");
  136. let mut this = Raft {
  137. inner_state: Arc::new(Mutex::new(state)),
  138. peers,
  139. me: Peer(me),
  140. new_log_entry: None,
  141. apply_command_signal: Arc::new(Default::default()),
  142. keep_running: Arc::new(Default::default()),
  143. election: Arc::new(election),
  144. thread_pool: Arc::new(thread_pool),
  145. stop_wait_group: WaitGroup::new(),
  146. };
  147. // TODO: read persist.
  148. this.keep_running.store(true, Ordering::SeqCst);
  149. // Running in a standalone thread.
  150. this.run_log_entry_daemon();
  151. // Running in a standalone thread.
  152. this.run_apply_command_daemon(apply_command);
  153. // One off function that schedules many little tasks, running on the
  154. // internal thread pool.
  155. this.schedule_heartbeats(Duration::from_millis(
  156. HEARTBEAT_INTERVAL_MILLIS,
  157. ));
  158. // The last step is to start running election timer.
  159. this.run_election_timer();
  160. this
  161. }
  162. pub(crate) fn process_request_vote(
  163. &self,
  164. args: RequestVoteArgs,
  165. ) -> RequestVoteReply {
  166. let mut rf = self.inner_state.lock();
  167. let term = rf.current_term;
  168. #[allow(clippy::comparison_chain)]
  169. if args.term < term {
  170. return RequestVoteReply {
  171. term,
  172. vote_granted: false,
  173. };
  174. } else if args.term > term {
  175. rf.current_term = args.term;
  176. rf.voted_for = None;
  177. rf.state = State::Follower;
  178. self.election.reset_election_timer();
  179. rf.persist();
  180. }
  181. let voted_for = rf.voted_for;
  182. let (last_log_index, last_log_term) = rf.last_log_index_and_term();
  183. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  184. && (args.last_log_term > last_log_term
  185. || (args.last_log_term == last_log_term
  186. && args.last_log_index >= last_log_index))
  187. {
  188. rf.voted_for = Some(args.candidate_id);
  189. // It is possible that we have set a timer above when updating the
  190. // current term. It does not hurt to update the timer again.
  191. // We do need to persist, though.
  192. self.election.reset_election_timer();
  193. rf.persist();
  194. RequestVoteReply {
  195. term: args.term,
  196. vote_granted: true,
  197. }
  198. } else {
  199. RequestVoteReply {
  200. term: args.term,
  201. vote_granted: false,
  202. }
  203. }
  204. }
  205. pub(crate) fn process_append_entries(
  206. &self,
  207. args: AppendEntriesArgs,
  208. ) -> AppendEntriesReply {
  209. let mut rf = self.inner_state.lock();
  210. if rf.current_term > args.term {
  211. return AppendEntriesReply {
  212. term: rf.current_term,
  213. success: false,
  214. };
  215. }
  216. let _ = rf.deferred_persist();
  217. if rf.current_term < args.term {
  218. rf.current_term = args.term;
  219. rf.voted_for = None;
  220. }
  221. rf.state = State::Follower;
  222. rf.leader_id = args.leader_id;
  223. self.election.reset_election_timer();
  224. if rf.log.len() <= args.prev_log_index
  225. || rf.log[args.prev_log_index].term != args.term
  226. {
  227. return AppendEntriesReply {
  228. term: args.term,
  229. success: false,
  230. };
  231. }
  232. for (i, entry) in args.entries.iter().enumerate() {
  233. let index = i + args.prev_log_index + 1;
  234. if rf.log.len() > index {
  235. if rf.log[index].term != entry.term {
  236. rf.log.truncate(index);
  237. rf.log.push(entry.clone());
  238. }
  239. } else {
  240. rf.log.push(entry.clone());
  241. }
  242. }
  243. if args.leader_commit > rf.commit_index {
  244. rf.commit_index = if args.leader_commit < rf.log.len() {
  245. args.leader_commit
  246. } else {
  247. rf.log.len() - 1
  248. };
  249. self.apply_command_signal.notify_one();
  250. }
  251. AppendEntriesReply {
  252. term: args.term,
  253. success: true,
  254. }
  255. }
  256. fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
  257. let this = self.clone();
  258. std::thread::spawn(move || {
  259. let election = this.election.clone();
  260. let mut should_run = None;
  261. while this.keep_running.load(Ordering::SeqCst) {
  262. let mut cancel_handle = should_run
  263. .map(|last_timer_count| this.run_election(last_timer_count))
  264. .flatten();
  265. let mut guard = election.timer.lock();
  266. let (timer_count, deadline) = *guard;
  267. if let Some(last_timer_count) = should_run {
  268. // If the timer was changed more than once, we know the
  269. // last scheduled election should have been cancelled.
  270. if timer_count > last_timer_count + 1 {
  271. cancel_handle.take().map(|c| c.send(()));
  272. }
  273. }
  274. should_run = match deadline {
  275. Some(timeout) => loop {
  276. let ret =
  277. election.signal.wait_until(&mut guard, timeout);
  278. let fired = ret.timed_out() && Instant::now() < timeout;
  279. // If the timer has been updated, do not schedule,
  280. // break so that we could cancel.
  281. if timer_count != guard.0 {
  282. // Timer has been updated, cancel current
  283. // election, and block on timeout again.
  284. break None;
  285. } else if fired {
  286. // Timer has fired, remove the timer and allow
  287. // running the next election at timer_count.
  288. // If the next election is cancelled before we
  289. // are back on wait, timer_count will be set to
  290. // a different value.
  291. guard.0 += 1;
  292. guard.1.take();
  293. break Some(guard.0);
  294. }
  295. },
  296. None => {
  297. election.signal.wait(&mut guard);
  298. // The timeout has changed, check again.
  299. None
  300. }
  301. };
  302. drop(guard);
  303. // Whenever woken up, cancel the current running election.
  304. // There are 3 cases we could reach here
  305. // 1. We received an AppendEntries, or decided to vote for
  306. // a peer, and thus turned into a follower. In this case we'll
  307. // be notified by the election signal.
  308. // 2. We are a follower but didn't receive a heartbeat on time,
  309. // or we are a candidate but didn't not collect enough vote on
  310. // time. In this case we'll have a timeout.
  311. // 3. When become a leader, or are shutdown. In this case we'll
  312. // be notified by the election signal.
  313. cancel_handle.map(|c| c.send(()));
  314. }
  315. // `this` is dropped right here. We cannot drop(this) anymore.
  316. let stop_wait_group = this.stop_wait_group;
  317. // Making sure the rest of `this` is dropped before the wait group.
  318. drop(stop_wait_group);
  319. })
  320. }
  321. fn run_election(
  322. &self,
  323. timer_count: usize,
  324. ) -> Option<futures::channel::oneshot::Sender<()>> {
  325. let me = self.me;
  326. let (term, args) = {
  327. let mut rf = self.inner_state.lock();
  328. // The previous election is faster and reached the critical section
  329. // before us. We should stop and not run this election.
  330. // Or someone else increased the term and the timer is reset.
  331. if !self.election.try_reset_election_timer(timer_count) {
  332. return None;
  333. }
  334. rf.current_term.0 += 1;
  335. rf.voted_for = Some(me);
  336. rf.state = State::Candidate;
  337. rf.persist();
  338. let term = rf.current_term;
  339. let (last_log_index, last_log_term) = rf.last_log_index_and_term();
  340. (
  341. term,
  342. RequestVoteArgs {
  343. term,
  344. candidate_id: me,
  345. last_log_index,
  346. last_log_term,
  347. },
  348. )
  349. };
  350. let mut votes = vec![];
  351. for (index, rpc_client) in self.peers.iter().enumerate() {
  352. if index != self.me.0 {
  353. // RpcClient must be cloned to avoid sending its reference
  354. // across threads.
  355. let rpc_client = rpc_client.clone();
  356. // RPCs are started right away.
  357. let one_vote = self
  358. .thread_pool
  359. .spawn(Self::request_vote(rpc_client, args.clone()));
  360. // Futures must be pinned so that they have Unpin, as required
  361. // by futures::future::select.
  362. votes.push(one_vote);
  363. }
  364. }
  365. let (tx, rx) = futures::channel::oneshot::channel();
  366. self.thread_pool.spawn(Self::count_vote_util_cancelled(
  367. term,
  368. self.inner_state.clone(),
  369. self.election.clone(),
  370. votes,
  371. self.peers.len() / 2,
  372. rx,
  373. self.new_log_entry.clone().unwrap(),
  374. ));
  375. Some(tx)
  376. }
  377. const REQUEST_VOTE_RETRY: usize = 4;
  378. async fn request_vote(
  379. rpc_client: RpcClient,
  380. args: RequestVoteArgs,
  381. ) -> Option<bool> {
  382. let term = args.term;
  383. let reply = retry_rpc(Self::REQUEST_VOTE_RETRY, move |_round| {
  384. rpc_client.clone().call_request_vote(args.clone())
  385. })
  386. .await;
  387. if let Ok(reply) = reply {
  388. return Some(reply.vote_granted && reply.term == term);
  389. }
  390. None
  391. }
  392. async fn count_vote_util_cancelled(
  393. term: Term,
  394. rf: Arc<Mutex<RaftState>>,
  395. election: Arc<ElectionState>,
  396. votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
  397. majority: usize,
  398. cancel_token: futures::channel::oneshot::Receiver<()>,
  399. new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
  400. ) {
  401. let mut vote_count = 0;
  402. let mut against_count = 0;
  403. let mut cancel_token = cancel_token;
  404. let mut futures_vec = votes;
  405. while vote_count < majority && against_count <= majority {
  406. // Mixing tokio futures with futures-rs ones. Fingers crossed.
  407. let selected = futures::future::select(
  408. cancel_token,
  409. futures::future::select_all(futures_vec),
  410. )
  411. .await;
  412. let ((one_vote, _, rest), new_token) = match selected {
  413. futures::future::Either::Left(_) => break,
  414. futures::future::Either::Right(tuple) => tuple,
  415. };
  416. futures_vec = rest;
  417. cancel_token = new_token;
  418. if let Ok(Some(vote)) = one_vote {
  419. if vote {
  420. vote_count += 1
  421. } else {
  422. against_count += 1
  423. }
  424. }
  425. }
  426. if vote_count < majority {
  427. return;
  428. }
  429. let mut rf = rf.lock();
  430. if rf.current_term == term && rf.state == State::Candidate {
  431. // We are the leader now. The election timer can be stopped.
  432. election.stop_election_timer();
  433. rf.state = State::Leader;
  434. let log_len = rf.log.len();
  435. for item in rf.next_index.iter_mut() {
  436. *item = log_len;
  437. }
  438. for item in rf.match_index.iter_mut() {
  439. *item = 0;
  440. }
  441. for item in rf.current_step.iter_mut() {
  442. *item = 0;
  443. }
  444. // Sync all logs now.
  445. new_log_entry
  446. .send(None)
  447. .expect("Triggering log entry syncing should not fail");
  448. rf.persist();
  449. }
  450. }
  451. fn schedule_heartbeats(&self, interval: Duration) {
  452. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  453. if peer_index != self.me.0 {
  454. // rf is now owned by the outer async function.
  455. let rf = self.inner_state.clone();
  456. // RPC client must be cloned into the outer async function.
  457. let rpc_client = rpc_client.clone();
  458. // Shutdown signal.
  459. let keep_running = self.keep_running.clone();
  460. self.thread_pool.spawn(async move {
  461. let mut interval = tokio::time::interval(interval);
  462. while keep_running.load(Ordering::SeqCst) {
  463. interval.tick().await;
  464. if let Some(args) = Self::build_heartbeat(&rf) {
  465. tokio::spawn(Self::send_heartbeat(
  466. rpc_client.clone(),
  467. args,
  468. ));
  469. }
  470. }
  471. });
  472. }
  473. }
  474. }
  475. fn build_heartbeat(
  476. rf: &Arc<Mutex<RaftState>>,
  477. ) -> Option<AppendEntriesArgs> {
  478. let rf = rf.lock();
  479. if rf.is_leader() {
  480. return None;
  481. }
  482. let (last_log_index, last_log_term) = rf.last_log_index_and_term();
  483. let args = AppendEntriesArgs {
  484. term: rf.current_term,
  485. leader_id: rf.leader_id,
  486. prev_log_index: last_log_index,
  487. prev_log_term: last_log_term,
  488. entries: vec![],
  489. leader_commit: rf.commit_index,
  490. };
  491. Some(args)
  492. }
  493. const HEARTBEAT_RETRY: usize = 3;
  494. async fn send_heartbeat(
  495. rpc_client: RpcClient,
  496. args: AppendEntriesArgs,
  497. ) -> std::io::Result<()> {
  498. retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
  499. rpc_client.clone().call_append_entries(args.clone())
  500. })
  501. .await?;
  502. Ok(())
  503. }
  504. fn run_log_entry_daemon(&mut self) -> std::thread::JoinHandle<()> {
  505. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  506. self.new_log_entry.replace(tx);
  507. // Clone everything that the thread needs.
  508. let this = self.clone();
  509. std::thread::spawn(move || {
  510. while let Ok(peer) = rx.recv() {
  511. if !this.keep_running.load(Ordering::SeqCst) {
  512. break;
  513. }
  514. for (i, rpc_client) in this.peers.iter().enumerate() {
  515. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  516. {
  517. this.thread_pool.spawn(Self::sync_log_entry(
  518. this.inner_state.clone(),
  519. rpc_client.clone(),
  520. i,
  521. this.new_log_entry.clone().unwrap(),
  522. this.apply_command_signal.clone(),
  523. ));
  524. }
  525. }
  526. }
  527. // `this` is dropped right here. We cannot drop(this) anymore.
  528. let stop_wait_group = this.stop_wait_group;
  529. // Making sure the rest of `this` is dropped before the wait group.
  530. drop(stop_wait_group);
  531. })
  532. }
  533. async fn sync_log_entry(
  534. rf: Arc<Mutex<RaftState>>,
  535. rpc_client: RpcClient,
  536. peer_index: usize,
  537. rerun: std::sync::mpsc::Sender<Option<Peer>>,
  538. apply_command_signal: Arc<Condvar>,
  539. ) {
  540. // TODO: cancel in flight changes?
  541. let args = Self::build_append_entries(&rf, peer_index);
  542. let term = args.term;
  543. let match_index = args.prev_log_index + args.entries.len();
  544. let succeeded = Self::append_entries(rpc_client, args).await;
  545. match succeeded {
  546. Ok(Some(succeeded)) => {
  547. if succeeded {
  548. let mut rf = rf.lock();
  549. rf.next_index[peer_index] = match_index + 1;
  550. if match_index > rf.match_index[peer_index] {
  551. rf.match_index[peer_index] = match_index;
  552. if rf.is_leader() && rf.current_term == term {
  553. let mut matched = rf.match_index.to_vec();
  554. let mid = matched.len() / 2 + 1;
  555. matched.sort();
  556. let new_commit_index = matched[mid];
  557. if new_commit_index > rf.commit_index
  558. && rf.log[new_commit_index].term
  559. == rf.current_term
  560. {
  561. rf.commit_index = new_commit_index;
  562. apply_command_signal.notify_one();
  563. }
  564. }
  565. }
  566. } else {
  567. let mut rf = rf.lock();
  568. let step = &mut rf.current_step[peer_index];
  569. *step += 1;
  570. let diff = (1 << 8) << *step;
  571. let next_index = &mut rf.next_index[peer_index];
  572. if diff >= *next_index {
  573. *next_index = 1usize;
  574. } else {
  575. *next_index -= diff;
  576. }
  577. rerun
  578. .send(Some(Peer(peer_index)))
  579. .expect("Triggering log entry syncing should not fail");
  580. }
  581. }
  582. // Do nothing, not our term anymore.
  583. Ok(None) => {}
  584. Err(_) => {
  585. tokio::time::delay_for(Duration::from_millis(
  586. HEARTBEAT_INTERVAL_MILLIS,
  587. ))
  588. .await;
  589. rerun
  590. .send(Some(Peer(peer_index)))
  591. .expect("Triggering log entry syncing should not fail");
  592. }
  593. };
  594. }
  595. fn build_append_entries(
  596. rf: &Arc<Mutex<RaftState>>,
  597. peer_index: usize,
  598. ) -> AppendEntriesArgs {
  599. let rf = rf.lock();
  600. let (prev_log_index, prev_log_term) = rf.last_log_index_and_term();
  601. AppendEntriesArgs {
  602. term: rf.current_term,
  603. leader_id: rf.leader_id,
  604. prev_log_index,
  605. prev_log_term,
  606. entries: rf.log[rf.next_index[peer_index]..].to_vec(),
  607. leader_commit: rf.commit_index,
  608. }
  609. }
  610. const APPEND_ENTRIES_RETRY: usize = 3;
  611. async fn append_entries(
  612. rpc_client: RpcClient,
  613. args: AppendEntriesArgs,
  614. ) -> std::io::Result<Option<bool>> {
  615. let term = args.term;
  616. let reply = retry_rpc(Self::APPEND_ENTRIES_RETRY, move |_round| {
  617. rpc_client.clone().call_append_entries(args.clone())
  618. })
  619. .await?;
  620. Ok(if reply.term == term {
  621. Some(reply.success)
  622. } else {
  623. None
  624. })
  625. }
  626. fn run_apply_command_daemon<Func>(
  627. &self,
  628. mut apply_command: Func,
  629. ) -> std::thread::JoinHandle<()>
  630. where
  631. Func: 'static + Send + FnMut(Index, Command),
  632. {
  633. let keep_running = self.keep_running.clone();
  634. let rf = self.inner_state.clone();
  635. let condvar = self.apply_command_signal.clone();
  636. let stop_wait_group = self.stop_wait_group.clone();
  637. std::thread::spawn(move || {
  638. while keep_running.load(Ordering::SeqCst) {
  639. let (mut index, commands) = {
  640. let mut rf = rf.lock();
  641. if rf.last_applied >= rf.commit_index {
  642. condvar.wait_for(
  643. &mut rf,
  644. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
  645. );
  646. }
  647. if rf.last_applied < rf.commit_index {
  648. rf.last_applied += 1;
  649. let index = rf.last_applied;
  650. let commands: Vec<Command> = rf.log[index..]
  651. .iter()
  652. .map(|entry| entry.command.clone())
  653. .collect();
  654. (index, commands)
  655. } else {
  656. continue;
  657. }
  658. };
  659. // Release the lock while calling external functions.
  660. for command in commands {
  661. apply_command(index, command);
  662. index += 1;
  663. }
  664. }
  665. drop(stop_wait_group);
  666. })
  667. }
  668. pub fn start(&self, command: Command) -> Option<(Term, Index)> {
  669. let mut rf = self.inner_state.lock();
  670. let term = rf.current_term;
  671. if !rf.is_leader() {
  672. return None;
  673. }
  674. let index = rf.log.len();
  675. rf.log.push(LogEntry {
  676. term,
  677. index,
  678. command,
  679. });
  680. rf.persist();
  681. self.new_log_entry
  682. .clone()
  683. .unwrap()
  684. .send(None)
  685. .expect("Sending to new log entry queue should never fail.");
  686. Some((term, index))
  687. }
  688. pub fn kill(mut self) {
  689. self.keep_running.store(false, Ordering::SeqCst);
  690. self.election.stop_election_timer();
  691. self.new_log_entry.take().map(|n| n.send(None));
  692. self.apply_command_signal.notify_all();
  693. self.stop_wait_group.wait();
  694. std::sync::Arc::try_unwrap(self.thread_pool)
  695. .expect(
  696. "All references to the thread pool should have been dropped.",
  697. )
  698. .shutdown_timeout(Duration::from_millis(
  699. HEARTBEAT_INTERVAL_MILLIS * 2,
  700. ));
  701. self.inner_state.lock().persist();
  702. }
  703. pub fn get_state(&self) -> (Term, bool) {
  704. let state = self.inner_state.lock();
  705. (state.current_term, state.is_leader())
  706. }
  707. }
  708. impl RaftState {
  709. fn persist(&self) {
  710. // TODO: implement
  711. }
  712. fn deferred_persist(&self) -> impl Drop + '_ {
  713. DropGuard::new(move || self.persist())
  714. }
  715. fn last_log_index_and_term(&self) -> (Index, Term) {
  716. let len = self.log.len();
  717. assert!(len > 0, "There should always be at least one entry in log");
  718. (len - 1, self.log.last().unwrap().term)
  719. }
  720. fn is_leader(&self) -> bool {
  721. self.state == State::Leader
  722. }
  723. }
  724. const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
  725. const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
  726. const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
  727. impl ElectionState {
  728. fn reset_election_timer(&self) {
  729. let mut guard = self.timer.lock();
  730. guard.0 += 1;
  731. guard.1.replace(Instant::now() + Self::election_timeout());
  732. self.signal.notify_one();
  733. }
  734. fn try_reset_election_timer(&self, timer_count: usize) -> bool {
  735. let mut guard = self.timer.lock();
  736. if guard.0 != timer_count {
  737. return false;
  738. }
  739. guard.0 += 1;
  740. guard.1.replace(Instant::now() + Self::election_timeout());
  741. self.signal.notify_one();
  742. true
  743. }
  744. fn election_timeout() -> Duration {
  745. Duration::from_millis(
  746. ELECTION_TIMEOUT_BASE_MILLIS
  747. + thread_rng().gen_range(0, ELECTION_TIMEOUT_VAR_MILLIS),
  748. )
  749. }
  750. fn stop_election_timer(&self) {
  751. let mut guard = self.timer.lock();
  752. guard.0 += 1;
  753. guard.1.take();
  754. self.signal.notify_one();
  755. }
  756. }