lib.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. extern crate bincode;
  2. extern crate futures_channel;
  3. extern crate futures_util;
  4. extern crate labrpc;
  5. extern crate rand;
  6. #[macro_use]
  7. extern crate serde_derive;
  8. extern crate tokio;
  9. use std::convert::TryFrom;
  10. use std::sync::atomic::{AtomicBool, Ordering};
  11. use std::sync::Arc;
  12. use std::time::Duration;
  13. use crossbeam_utils::sync::WaitGroup;
  14. use parking_lot::{Condvar, Mutex};
  15. use crate::apply_command::ApplyCommandFnMut;
  16. pub use crate::apply_command::ApplyCommandMessage;
  17. use crate::daemon_env::{DaemonEnv, ThreadEnv};
  18. use crate::election::ElectionState;
  19. use crate::index_term::IndexTerm;
  20. use crate::persister::PersistedRaftState;
  21. pub use crate::persister::Persister;
  22. pub(crate) use crate::raft_state::RaftState;
  23. pub(crate) use crate::raft_state::State;
  24. pub use crate::rpcs::RpcClient;
  25. pub use crate::snapshot::Snapshot;
  26. use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
  27. mod apply_command;
  28. mod daemon_env;
  29. mod election;
  30. mod heartbeats;
  31. mod index_term;
  32. mod install_snapshot;
  33. mod log_array;
  34. mod persister;
  35. mod raft_state;
  36. pub mod rpcs;
  37. mod snapshot;
  38. mod sync_log_entry;
  39. pub mod utils;
  40. #[derive(
  41. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  42. )]
  43. pub struct Term(pub usize);
  44. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  45. struct Peer(usize);
  46. pub type Index = usize;
  47. #[derive(Clone, Debug, Serialize, Deserialize)]
  48. struct LogEntry<Command> {
  49. index: Index,
  50. term: Term,
  51. command: Command,
  52. }
  53. #[derive(Clone)]
  54. pub struct Raft<Command> {
  55. inner_state: Arc<Mutex<RaftState<Command>>>,
  56. peers: Vec<Arc<RpcClient>>,
  57. me: Peer,
  58. persister: Arc<dyn Persister>,
  59. new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
  60. apply_command_signal: Arc<Condvar>,
  61. keep_running: Arc<AtomicBool>,
  62. election: Arc<ElectionState>,
  63. snapshot_daemon: SnapshotDaemon,
  64. thread_pool: Arc<tokio::runtime::Runtime>,
  65. daemon_env: DaemonEnv,
  66. stop_wait_group: WaitGroup,
  67. }
  68. #[derive(Clone, Debug, Serialize, Deserialize)]
  69. struct RequestVoteArgs {
  70. term: Term,
  71. candidate_id: Peer,
  72. last_log_index: Index,
  73. last_log_term: Term,
  74. }
  75. #[derive(Clone, Debug, Serialize, Deserialize)]
  76. struct RequestVoteReply {
  77. term: Term,
  78. vote_granted: bool,
  79. }
  80. #[derive(Clone, Debug, Serialize, Deserialize)]
  81. struct AppendEntriesArgs<Command> {
  82. term: Term,
  83. leader_id: Peer,
  84. prev_log_index: Index,
  85. prev_log_term: Term,
  86. entries: Vec<LogEntry<Command>>,
  87. leader_commit: Index,
  88. }
  89. #[derive(Clone, Debug, Serialize, Deserialize)]
  90. struct AppendEntriesReply {
  91. term: Term,
  92. success: bool,
  93. committed: Option<IndexTerm>,
  94. }
  95. // Commands must be
  96. // 0. 'static: they have to live long enough for thread pools.
  97. // 1. clone: they are put in vectors and request messages.
  98. // 2. serializable: they are sent over RPCs and persisted.
  99. // 3. deserializable: they are restored from storage.
  100. // 4. send: they are referenced in futures.
  101. // 5. default, because we need an element for the first entry.
  102. impl<Command> Raft<Command>
  103. where
  104. Command: 'static
  105. + Clone
  106. + serde::Serialize
  107. + serde::de::DeserializeOwned
  108. + Send
  109. + Default,
  110. {
  111. /// Create a new raft instance.
  112. ///
  113. /// Each instance will create at least 3 + (number of peers) threads. The
  114. /// extensive usage of threads is to minimize latency.
  115. pub fn new(
  116. peers: Vec<RpcClient>,
  117. me: usize,
  118. persister: Arc<dyn Persister>,
  119. apply_command: impl ApplyCommandFnMut<Command>,
  120. max_state_size_bytes: Option<usize>,
  121. request_snapshot: impl RequestSnapshotFnMut,
  122. ) -> Self {
  123. let peer_size = peers.len();
  124. assert!(peer_size > me, "My index should be smaller than peer size.");
  125. let mut state = RaftState::create(peer_size, Peer(me));
  126. if let Ok(persisted_state) =
  127. PersistedRaftState::try_from(persister.read_state())
  128. {
  129. state.current_term = persisted_state.current_term;
  130. state.voted_for = persisted_state.voted_for;
  131. state.log = persisted_state.log;
  132. state.commit_index = state.log.start();
  133. }
  134. let election = ElectionState::create();
  135. election.reset_election_timer();
  136. let daemon_env = DaemonEnv::create();
  137. let thread_env = daemon_env.for_thread();
  138. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  139. .enable_time()
  140. .thread_name(format!("raft-instance-{}", me))
  141. .worker_threads(peer_size)
  142. .on_thread_start(move || thread_env.clone().attach())
  143. .on_thread_stop(ThreadEnv::detach)
  144. .build()
  145. .expect("Creating thread pool should not fail");
  146. let peers = peers.into_iter().map(Arc::new).collect();
  147. let mut this = Raft {
  148. inner_state: Arc::new(Mutex::new(state)),
  149. peers,
  150. me: Peer(me),
  151. persister,
  152. new_log_entry: None,
  153. apply_command_signal: Arc::new(Default::default()),
  154. keep_running: Arc::new(Default::default()),
  155. election: Arc::new(election),
  156. snapshot_daemon: Default::default(),
  157. thread_pool: Arc::new(thread_pool),
  158. daemon_env,
  159. stop_wait_group: WaitGroup::new(),
  160. };
  161. this.keep_running.store(true, Ordering::SeqCst);
  162. // Running in a standalone thread.
  163. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
  164. // Running in a standalone thread.
  165. this.run_log_entry_daemon();
  166. // Running in a standalone thread.
  167. this.run_apply_command_daemon(apply_command);
  168. // One off function that schedules many little tasks, running on the
  169. // internal thread pool.
  170. this.schedule_heartbeats(Duration::from_millis(
  171. HEARTBEAT_INTERVAL_MILLIS,
  172. ));
  173. // The last step is to start running election timer.
  174. this.run_election_timer();
  175. this
  176. }
  177. }
  178. // Command must be
  179. // 1. clone: they are copied to the persister.
  180. // 2. serialize: they are converted to bytes to persist.
  181. // 3. default: a default value is used as the first element of the log.
  182. impl<Command> Raft<Command>
  183. where
  184. Command: Clone + serde::Serialize + Default,
  185. {
  186. pub(crate) fn process_request_vote(
  187. &self,
  188. args: RequestVoteArgs,
  189. ) -> RequestVoteReply {
  190. // Note: do not change this to `let _ = ...`.
  191. let _guard = self.daemon_env.for_scope();
  192. let mut rf = self.inner_state.lock();
  193. let term = rf.current_term;
  194. #[allow(clippy::comparison_chain)]
  195. if args.term < term {
  196. return RequestVoteReply {
  197. term,
  198. vote_granted: false,
  199. };
  200. } else if args.term > term {
  201. rf.current_term = args.term;
  202. rf.voted_for = None;
  203. rf.state = State::Follower;
  204. self.election.reset_election_timer();
  205. self.persister.save_state(rf.persisted_state().into());
  206. }
  207. let voted_for = rf.voted_for;
  208. let last_log = rf.log.last_index_term();
  209. if (voted_for.is_none() || voted_for == Some(args.candidate_id))
  210. && (args.last_log_term > last_log.term
  211. || (args.last_log_term == last_log.term
  212. && args.last_log_index >= last_log.index))
  213. {
  214. rf.voted_for = Some(args.candidate_id);
  215. // It is possible that we have set a timer above when updating the
  216. // current term. It does not hurt to update the timer again.
  217. // We do need to persist, though.
  218. self.election.reset_election_timer();
  219. self.persister.save_state(rf.persisted_state().into());
  220. RequestVoteReply {
  221. term: args.term,
  222. vote_granted: true,
  223. }
  224. } else {
  225. RequestVoteReply {
  226. term: args.term,
  227. vote_granted: false,
  228. }
  229. }
  230. }
  231. pub(crate) fn process_append_entries(
  232. &self,
  233. args: AppendEntriesArgs<Command>,
  234. ) -> AppendEntriesReply {
  235. // Note: do not change this to `let _ = ...`.
  236. let _guard = self.daemon_env.for_scope();
  237. let mut rf = self.inner_state.lock();
  238. if rf.current_term > args.term {
  239. return AppendEntriesReply {
  240. term: rf.current_term,
  241. success: false,
  242. committed: Some(rf.log.first_after(rf.commit_index).into()),
  243. };
  244. }
  245. if rf.current_term < args.term {
  246. rf.current_term = args.term;
  247. rf.voted_for = None;
  248. self.persister.save_state(rf.persisted_state().into());
  249. }
  250. rf.state = State::Follower;
  251. rf.leader_id = args.leader_id;
  252. self.election.reset_election_timer();
  253. if rf.log.start() > args.prev_log_index
  254. || rf.log.end() <= args.prev_log_index
  255. || rf.log[args.prev_log_index].term != args.prev_log_term
  256. {
  257. return AppendEntriesReply {
  258. term: args.term,
  259. success: args.prev_log_index < rf.log.start(),
  260. committed: Some(rf.log.first_after(rf.commit_index).into()),
  261. };
  262. }
  263. for (i, entry) in args.entries.iter().enumerate() {
  264. let index = i + args.prev_log_index + 1;
  265. if rf.log.end() > index {
  266. if rf.log[index].term != entry.term {
  267. check_or_record!(
  268. index > rf.commit_index,
  269. ErrorKind::RollbackCommitted(index),
  270. "Entries before commit index should never be rolled back",
  271. &rf
  272. );
  273. rf.log.truncate(index);
  274. rf.log.push(entry.clone());
  275. }
  276. } else {
  277. rf.log.push(entry.clone());
  278. }
  279. }
  280. self.persister.save_state(rf.persisted_state().into());
  281. if args.leader_commit > rf.commit_index {
  282. rf.commit_index = if args.leader_commit < rf.log.end() {
  283. args.leader_commit
  284. } else {
  285. rf.log.last_index_term().index
  286. };
  287. self.apply_command_signal.notify_one();
  288. }
  289. self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
  290. AppendEntriesReply {
  291. term: args.term,
  292. success: true,
  293. committed: None,
  294. }
  295. }
  296. }
  297. // Command must be
  298. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  299. // 1. clone: they are copied to the persister.
  300. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  301. // 3. serialize: they are converted to bytes to persist.
  302. // 4. default: a default value is used as the first element of log.
  303. impl<Command> Raft<Command>
  304. where
  305. Command: 'static + Clone + Send + serde::Serialize + Default,
  306. {
  307. pub fn start(&self, command: Command) -> Option<(Term, Index)> {
  308. let mut rf = self.inner_state.lock();
  309. let term = rf.current_term;
  310. if !rf.is_leader() {
  311. return None;
  312. }
  313. let index = rf.log.add_command(term, command);
  314. self.persister.save_state(rf.persisted_state().into());
  315. let _ = self.new_log_entry.clone().unwrap().send(None);
  316. Some((term, index))
  317. }
  318. pub fn kill(mut self) {
  319. self.keep_running.store(false, Ordering::SeqCst);
  320. self.election.stop_election_timer();
  321. self.new_log_entry.take().map(|n| n.send(None));
  322. self.apply_command_signal.notify_all();
  323. self.snapshot_daemon.kill();
  324. self.stop_wait_group.wait();
  325. std::sync::Arc::try_unwrap(self.thread_pool)
  326. .expect(
  327. "All references to the thread pool should have been dropped.",
  328. )
  329. .shutdown_timeout(Duration::from_millis(
  330. HEARTBEAT_INTERVAL_MILLIS * 2,
  331. ));
  332. // DaemonEnv must be shutdown after the thread pool, since there might
  333. // be tasks logging errors in the pool.
  334. self.daemon_env.shutdown();
  335. }
  336. pub fn get_state(&self) -> (Term, bool) {
  337. let state = self.inner_state.lock();
  338. (state.current_term, state.is_leader())
  339. }
  340. }
  341. pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
  342. const RPC_DEADLINE: Duration = Duration::from_secs(2);
  343. impl<C> Raft<C> {
  344. pub const NO_SNAPSHOT: fn(Index) = |_| {};
  345. }