extern crate bincode; extern crate futures_channel; extern crate futures_util; extern crate labrpc; extern crate rand; #[macro_use] extern crate serde_derive; extern crate tokio; use std::convert::TryFrom; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use crossbeam_utils::sync::WaitGroup; use parking_lot::{Condvar, Mutex}; use crate::apply_command::ApplyCommandFnMut; pub use crate::apply_command::ApplyCommandMessage; use crate::daemon_env::{DaemonEnv, ThreadEnv}; use crate::election::ElectionState; use crate::index_term::IndexTerm; use crate::persister::PersistedRaftState; pub use crate::persister::Persister; pub(crate) use crate::raft_state::RaftState; pub(crate) use crate::raft_state::State; pub use crate::rpcs::RpcClient; pub use crate::snapshot::Snapshot; use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon}; mod apply_command; mod daemon_env; mod election; mod heartbeats; mod index_term; mod install_snapshot; mod log_array; mod persister; mod raft_state; pub mod rpcs; mod snapshot; mod sync_log_entry; pub mod utils; #[derive( Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, )] pub struct Term(pub usize); #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] struct Peer(usize); pub type Index = usize; #[derive(Clone, Debug, Serialize, Deserialize)] struct LogEntry { index: Index, term: Term, command: Command, } #[derive(Clone)] pub struct Raft { inner_state: Arc>>, peers: Vec>, me: Peer, persister: Arc, new_log_entry: Option>>, apply_command_signal: Arc, keep_running: Arc, election: Arc, snapshot_daemon: SnapshotDaemon, thread_pool: Arc, daemon_env: DaemonEnv, stop_wait_group: WaitGroup, } #[derive(Clone, Debug, Serialize, Deserialize)] struct RequestVoteArgs { term: Term, candidate_id: Peer, last_log_index: Index, last_log_term: Term, } #[derive(Clone, Debug, Serialize, Deserialize)] struct RequestVoteReply { term: Term, vote_granted: bool, } #[derive(Clone, Debug, Serialize, Deserialize)] struct AppendEntriesArgs { term: Term, leader_id: Peer, prev_log_index: Index, prev_log_term: Term, entries: Vec>, leader_commit: Index, } #[derive(Clone, Debug, Serialize, Deserialize)] struct AppendEntriesReply { term: Term, success: bool, committed: Option, } // Commands must be // 0. 'static: they have to live long enough for thread pools. // 1. clone: they are put in vectors and request messages. // 2. serializable: they are sent over RPCs and persisted. // 3. deserializable: they are restored from storage. // 4. send: they are referenced in futures. // 5. default, because we need an element for the first entry. impl Raft where Command: 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Default, { /// Create a new raft instance. /// /// Each instance will create at least 3 + (number of peers) threads. The /// extensive usage of threads is to minimize latency. pub fn new( peers: Vec, me: usize, persister: Arc, apply_command: impl ApplyCommandFnMut, max_state_size_bytes: Option, request_snapshot: impl RequestSnapshotFnMut, ) -> Self { let peer_size = peers.len(); assert!(peer_size > me, "My index should be smaller than peer size."); let mut state = RaftState::create(peer_size, Peer(me)); if let Ok(persisted_state) = PersistedRaftState::try_from(persister.read_state()) { state.current_term = persisted_state.current_term; state.voted_for = persisted_state.voted_for; state.log = persisted_state.log; state.commit_index = state.log.start(); } let election = ElectionState::create(); election.reset_election_timer(); let daemon_env = DaemonEnv::create(); let thread_env = daemon_env.for_thread(); let thread_pool = tokio::runtime::Builder::new_multi_thread() .enable_time() .thread_name(format!("raft-instance-{}", me)) .worker_threads(peer_size) .on_thread_start(move || thread_env.clone().attach()) .on_thread_stop(ThreadEnv::detach) .build() .expect("Creating thread pool should not fail"); let peers = peers.into_iter().map(Arc::new).collect(); let mut this = Raft { inner_state: Arc::new(Mutex::new(state)), peers, me: Peer(me), persister, new_log_entry: None, apply_command_signal: Arc::new(Default::default()), keep_running: Arc::new(Default::default()), election: Arc::new(election), snapshot_daemon: Default::default(), thread_pool: Arc::new(thread_pool), daemon_env, stop_wait_group: WaitGroup::new(), }; this.keep_running.store(true, Ordering::SeqCst); // Running in a standalone thread. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot); // Running in a standalone thread. this.run_log_entry_daemon(); // Running in a standalone thread. this.run_apply_command_daemon(apply_command); // One off function that schedules many little tasks, running on the // internal thread pool. this.schedule_heartbeats(Duration::from_millis( HEARTBEAT_INTERVAL_MILLIS, )); // The last step is to start running election timer. this.run_election_timer(); this } } // Command must be // 1. clone: they are copied to the persister. // 2. serialize: they are converted to bytes to persist. // 3. default: a default value is used as the first element of the log. impl Raft where Command: Clone + serde::Serialize + Default, { pub(crate) fn process_request_vote( &self, args: RequestVoteArgs, ) -> RequestVoteReply { // Note: do not change this to `let _ = ...`. let _guard = self.daemon_env.for_scope(); let mut rf = self.inner_state.lock(); let term = rf.current_term; #[allow(clippy::comparison_chain)] if args.term < term { return RequestVoteReply { term, vote_granted: false, }; } else if args.term > term { rf.current_term = args.term; rf.voted_for = None; rf.state = State::Follower; self.election.reset_election_timer(); self.persister.save_state(rf.persisted_state().into()); } let voted_for = rf.voted_for; let last_log = rf.log.last_index_term(); if (voted_for.is_none() || voted_for == Some(args.candidate_id)) && (args.last_log_term > last_log.term || (args.last_log_term == last_log.term && args.last_log_index >= last_log.index)) { rf.voted_for = Some(args.candidate_id); // It is possible that we have set a timer above when updating the // current term. It does not hurt to update the timer again. // We do need to persist, though. self.election.reset_election_timer(); self.persister.save_state(rf.persisted_state().into()); RequestVoteReply { term: args.term, vote_granted: true, } } else { RequestVoteReply { term: args.term, vote_granted: false, } } } pub(crate) fn process_append_entries( &self, args: AppendEntriesArgs, ) -> AppendEntriesReply { // Note: do not change this to `let _ = ...`. let _guard = self.daemon_env.for_scope(); let mut rf = self.inner_state.lock(); if rf.current_term > args.term { return AppendEntriesReply { term: rf.current_term, success: false, committed: Some(rf.log.first_after(rf.commit_index).into()), }; } if rf.current_term < args.term { rf.current_term = args.term; rf.voted_for = None; self.persister.save_state(rf.persisted_state().into()); } rf.state = State::Follower; rf.leader_id = args.leader_id; self.election.reset_election_timer(); if rf.log.start() > args.prev_log_index || rf.log.end() <= args.prev_log_index || rf.log[args.prev_log_index].term != args.prev_log_term { return AppendEntriesReply { term: args.term, success: args.prev_log_index < rf.log.start(), committed: Some(rf.log.first_after(rf.commit_index).into()), }; } for (i, entry) in args.entries.iter().enumerate() { let index = i + args.prev_log_index + 1; if rf.log.end() > index { if rf.log[index].term != entry.term { check_or_record!( index > rf.commit_index, ErrorKind::RollbackCommitted(index), "Entries before commit index should never be rolled back", &rf ); rf.log.truncate(index); rf.log.push(entry.clone()); } } else { rf.log.push(entry.clone()); } } self.persister.save_state(rf.persisted_state().into()); if args.leader_commit > rf.commit_index { rf.commit_index = if args.leader_commit < rf.log.end() { args.leader_commit } else { rf.log.last_index_term().index }; self.apply_command_signal.notify_one(); } self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end()); AppendEntriesReply { term: args.term, success: true, committed: None, } } } // Command must be // 0. 'static: Raft must be 'static, it is moved to another thread. // 1. clone: they are copied to the persister. // 2. send: Arc>>> must be send, it is moved to another thread. // 3. serialize: they are converted to bytes to persist. // 4. default: a default value is used as the first element of log. impl Raft where Command: 'static + Clone + Send + serde::Serialize + Default, { pub fn start(&self, command: Command) -> Option<(Term, Index)> { let mut rf = self.inner_state.lock(); let term = rf.current_term; if !rf.is_leader() { return None; } let index = rf.log.add_command(term, command); self.persister.save_state(rf.persisted_state().into()); let _ = self.new_log_entry.clone().unwrap().send(None); Some((term, index)) } pub fn kill(mut self) { self.keep_running.store(false, Ordering::SeqCst); self.election.stop_election_timer(); self.new_log_entry.take().map(|n| n.send(None)); self.apply_command_signal.notify_all(); self.snapshot_daemon.kill(); self.stop_wait_group.wait(); std::sync::Arc::try_unwrap(self.thread_pool) .expect( "All references to the thread pool should have been dropped.", ) .shutdown_timeout(Duration::from_millis( HEARTBEAT_INTERVAL_MILLIS * 2, )); // DaemonEnv must be shutdown after the thread pool, since there might // be tasks logging errors in the pool. self.daemon_env.shutdown(); } pub fn get_state(&self) -> (Term, bool) { let state = self.inner_state.lock(); (state.current_term, state.is_leader()) } } pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150; const RPC_DEADLINE: Duration = Duration::from_secs(2); impl Raft { pub const NO_SNAPSHOT: fn(Index) = |_| {}; }