raft.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. use std::sync::atomic::{AtomicBool, Ordering};
  2. use std::sync::Arc;
  3. use std::time::Duration;
  4. use parking_lot::{Condvar, Mutex};
  5. use serde_derive::{Deserialize, Serialize};
  6. use crate::apply_command::ApplyCommandFnMut;
  7. use crate::daemon_env::{DaemonEnv, ThreadEnv};
  8. use crate::election::ElectionState;
  9. use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
  10. use crate::persister::PersistedRaftState;
  11. use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
  12. use crate::verify_authority::VerifyAuthorityDaemon;
  13. use crate::{
  14. utils, IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand,
  15. };
  16. #[derive(
  17. Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
  18. )]
  19. pub struct Term(pub usize);
  20. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
  21. pub struct Peer(pub usize);
  22. #[derive(Clone)]
  23. pub struct Raft<Command> {
  24. pub(crate) inner_state: Arc<Mutex<RaftState<Command>>>,
  25. pub(crate) peers: Vec<Arc<dyn RemoteRaft<Command>>>,
  26. pub(crate) me: Peer,
  27. pub(crate) persister: Arc<dyn Persister>,
  28. pub(crate) new_log_entry: Option<utils::SharedSender<Option<Peer>>>,
  29. pub(crate) apply_command_signal: Arc<Condvar>,
  30. pub(crate) keep_running: Arc<AtomicBool>,
  31. pub(crate) election: Arc<ElectionState>,
  32. pub(crate) snapshot_daemon: SnapshotDaemon,
  33. pub(crate) verify_authority_daemon: VerifyAuthorityDaemon,
  34. pub(crate) heartbeats_daemon: HeartbeatsDaemon,
  35. pub(crate) thread_pool: utils::ThreadPoolHolder,
  36. pub(crate) daemon_env: DaemonEnv,
  37. }
  38. impl<Command: ReplicableCommand> Raft<Command> {
  39. /// Create a new raft instance.
  40. ///
  41. /// Each instance will create at least 4 + (number of peers) threads. The
  42. /// extensive usage of threads is to minimize latency.
  43. pub fn new(
  44. peers: Vec<impl RemoteRaft<Command> + 'static>,
  45. me: usize,
  46. persister: Arc<dyn Persister>,
  47. apply_command: impl ApplyCommandFnMut<Command>,
  48. max_state_size_bytes: Option<usize>,
  49. request_snapshot: impl RequestSnapshotFnMut,
  50. ) -> Self {
  51. let peer_size = peers.len();
  52. assert!(peer_size > me, "My index should be smaller than peer size.");
  53. let mut state = RaftState::create(peer_size, Peer(me));
  54. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: Initially
  55. // commit_index = log.start() and commit_index + 1 = log.end(). Thus
  56. // log.start() <= commit_index and commit_index < log.end() both hold.
  57. assert_eq!(state.commit_index + 1, state.log.end());
  58. if let Ok(persisted_state) =
  59. PersistedRaftState::try_from(persister.read_state())
  60. {
  61. state.current_term = persisted_state.current_term;
  62. state.voted_for = persisted_state.voted_for;
  63. state.log = persisted_state.log;
  64. state.commit_index = state.log.start();
  65. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: the saved
  66. // snapshot must have a valid log.start() and log.end(). Thus
  67. // log.start() <= commit_index and commit_index < log.end() hold.
  68. assert!(state.commit_index < state.log.end());
  69. state
  70. .log
  71. .validate(state.current_term)
  72. .expect("Persisted log should not contain error");
  73. }
  74. let election = ElectionState::create();
  75. election.reset_election_timer();
  76. let daemon_env = DaemonEnv::create();
  77. let thread_env = daemon_env.for_thread();
  78. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  79. .enable_time()
  80. .enable_io()
  81. .thread_name(format!("raft-instance-{}", me))
  82. .worker_threads(peer_size)
  83. .on_thread_start(move || thread_env.clone().attach())
  84. .on_thread_stop(ThreadEnv::detach)
  85. .build()
  86. .expect("Creating thread pool should not fail");
  87. let peers = peers
  88. .into_iter()
  89. .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
  90. .collect();
  91. let mut this = Raft {
  92. inner_state: Arc::new(Mutex::new(state)),
  93. peers,
  94. me: Peer(me),
  95. persister,
  96. new_log_entry: None,
  97. apply_command_signal: Arc::new(Condvar::new()),
  98. keep_running: Arc::new(AtomicBool::new(true)),
  99. election: Arc::new(election),
  100. snapshot_daemon: SnapshotDaemon::create(),
  101. verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
  102. heartbeats_daemon: HeartbeatsDaemon::create(),
  103. thread_pool: utils::ThreadPoolHolder::new(thread_pool),
  104. daemon_env,
  105. };
  106. // Running in a standalone thread.
  107. this.run_verify_authority_daemon();
  108. // Running in a standalone thread.
  109. this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
  110. // Running in a standalone thread.
  111. this.run_log_entry_daemon();
  112. // Running in a standalone thread.
  113. this.run_apply_command_daemon(apply_command);
  114. // One off function that schedules many little tasks, running on the
  115. // internal thread pool.
  116. this.schedule_heartbeats(HEARTBEAT_INTERVAL);
  117. // The last step is to start running election timer.
  118. this.run_election_timer();
  119. this
  120. }
  121. }
  122. // Command must be
  123. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  124. // 1. clone: they are copied to the persister.
  125. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  126. // 3. serialize: they are converted to bytes to persist.
  127. impl<Command: ReplicableCommand> Raft<Command> {
  128. /// Adds a new command to the log, returns its index and the current term.
  129. ///
  130. /// Returns `None` if we are not the leader. The log entry may not have been
  131. /// committed to the log when this method returns. When and if it is
  132. /// committed, the `apply_command` callback will be called.
  133. pub fn start(&self, command: Command) -> Option<IndexTerm> {
  134. let mut rf = self.inner_state.lock();
  135. let term = rf.current_term;
  136. if !rf.is_leader() {
  137. return None;
  138. }
  139. let index = rf.log.add_command(term, command);
  140. self.persister.save_state(rf.persisted_state().into());
  141. // Several attempts have been made to remove the unwrap below.
  142. let _ = self.new_log_entry.as_ref().unwrap().send(None);
  143. log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
  144. Some(IndexTerm::pack(index, term))
  145. }
  146. const SHUTDOWN_TIMEOUT: Duration =
  147. Duration::from_millis(HEARTBEAT_INTERVAL.as_millis() as u64 * 2);
  148. /// Cleanly shutdown this instance. This function never blocks forever. It
  149. /// either panics or returns eventually.
  150. pub fn kill(mut self) {
  151. self.keep_running.store(false, Ordering::SeqCst);
  152. self.election.stop_election_timer();
  153. self.new_log_entry.take().map(|n| n.send(None));
  154. self.apply_command_signal.notify_all();
  155. self.snapshot_daemon.kill();
  156. self.verify_authority_daemon.kill();
  157. // We cannot easily combine stop_wait_group into DaemonEnv because of
  158. // shutdown dependencies. The thread pool is not managed by DaemonEnv,
  159. // but it cannot be shutdown until all daemons are. On the other hand
  160. // the thread pool uses DaemonEnv, thus must be shutdown before
  161. // DaemonEnv. The shutdown sequence is stop_wait_group -> thread_pool
  162. // -> DaemonEnv. The first and third cannot be combined with the second
  163. // in the middle.
  164. self.daemon_env.wait_for_daemons();
  165. self.thread_pool
  166. .take()
  167. .expect(
  168. "All references to the thread pool should have been dropped.",
  169. )
  170. .shutdown_timeout(Self::SHUTDOWN_TIMEOUT);
  171. // DaemonEnv must be shutdown after the thread pool, since there might
  172. // be tasks logging errors in the pool.
  173. self.daemon_env.shutdown();
  174. }
  175. /// Returns the current term and whether we are the leader.
  176. ///
  177. /// Take a quick peek at the current state of this instance. The returned
  178. /// value is stale as soon as this function returns.
  179. pub fn get_state(&self) -> (Term, bool) {
  180. let state = self.inner_state.lock();
  181. (state.current_term, state.is_leader())
  182. }
  183. }
  184. #[cfg(test)]
  185. mod tests {
  186. #[test]
  187. fn test_raft_must_sync() {
  188. let optional_raft: Option<super::Raft<i32>> = None;
  189. fn must_sync<T: Sync>(value: T) {
  190. drop(value)
  191. }
  192. must_sync(optional_raft)
  193. // The following raft is not Sync.
  194. // let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
  195. }
  196. }