daemon_env.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. use std::cell::RefCell;
  2. use std::sync::{Arc, Weak};
  3. use crossbeam_utils::sync::WaitGroup;
  4. use parking_lot::Mutex;
  5. #[cfg(all(not(test), feature = "integration-test"))]
  6. use test_utils::thread_local_logger::{self, LocalLogger};
  7. use crate::{IndexTerm, Peer, RaftState, State, Term};
  8. /// A convenient macro to record errors.
  9. #[macro_export]
  10. macro_rules! check_or_record {
  11. ($condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  12. if !$condition {
  13. crate::daemon_env::ThreadEnv::upgrade().record_error(
  14. $error_kind,
  15. $message,
  16. $rf,
  17. concat!(file!(), ":", line!()),
  18. )
  19. }
  20. };
  21. }
  22. /// Environment for daemons.
  23. ///
  24. /// Each daemon thread should hold a copy of this struct, either directly or
  25. /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
  26. /// errors to a central location, which cause a failure at shutdown. It also
  27. /// checks daemon thread panics and collect information if they do.
  28. #[derive(Clone, Debug)]
  29. pub(crate) struct DaemonEnv {
  30. data: Arc<Mutex<DaemonEnvData>>,
  31. thread_env: ThreadEnv,
  32. stop_wait_group: Option<WaitGroup>,
  33. }
  34. #[derive(Debug)]
  35. pub(crate) enum Daemon {
  36. Snapshot,
  37. ElectionTimer,
  38. SyncLogEntries,
  39. ApplyCommand,
  40. VerifyAuthority,
  41. }
  42. #[derive(Debug, Default)]
  43. struct DaemonEnvData {
  44. errors: Vec<Error>,
  45. daemons: Vec<(Daemon, std::thread::JoinHandle<()>)>,
  46. }
  47. #[allow(dead_code)]
  48. #[derive(Debug)]
  49. pub(crate) struct Error {
  50. error_kind: ErrorKind,
  51. message: String,
  52. raft_state: StrippedRaftState,
  53. file_line: &'static str,
  54. }
  55. #[allow(dead_code)]
  56. #[derive(Debug)]
  57. pub(crate) enum ErrorKind {
  58. /// The leader sent log entries that do not match a committed log entry.
  59. /// It could be caused by a misbehaving leader, or that the leader should
  60. /// never have been elected, or that this Raft instance incorrectly moved
  61. /// its commit index.
  62. RollbackCommitted(usize),
  63. /// Similar to [`Self::RollbackCommitted`], but the leader sent a snapshot
  64. /// that is inconsistent with a committed log entry.
  65. SnapshotBeforeCommitted(usize, Term),
  66. /// The application sent a snapshot that contains items beyond the log end.
  67. SnapshotAfterLogEnd(usize),
  68. /// The application sent a snapshot that contains items that are not
  69. /// committed yet. Only committed items are sent to the application.
  70. SnapshotNotCommitted(usize),
  71. /// The recipient of the `InstallSnapshot` RPC should have been able to
  72. /// verify the term at index `.0` but did not. The index `.0` is after
  73. /// their commit index `.1`, and thus not yet committed or archived into a
  74. /// local snapshot. The recipient should still have the log entry at `.0`.
  75. RefusedSnapshotAfterCommitted(usize, usize),
  76. /// Similar to [`Self::RollbackCommitted`], but this error is logged by the
  77. /// leader after receiving a reply from a follower.
  78. DivergedBeforeCommitted(usize, usize),
  79. /// A follower committed a log entry that is different from the leader. An
  80. /// opportunistic check that looks for log mismatches, missing committed log
  81. /// entries or other corruptions.
  82. DivergedAtCommitted(usize),
  83. /// A follower received an AppendEntries RPC with a `prev_log_index` that
  84. /// is inconsistent with the index of entries included in the same RPC.
  85. AppendEntriesIndexMismatch(usize, Vec<IndexTerm>),
  86. /// A follower committed a log entry that is beyond the log end of the
  87. /// leader. An opportunistic check that looks for log mismatches, missing
  88. /// committed log entries or other corruptions.
  89. CommittedBeyondEnd(usize),
  90. /// When examining a sync log entry response from a follower, the leader
  91. /// noticed that a log entry that it sent out is no longer in its own log.
  92. LeaderLogShrunk(usize),
  93. }
  94. impl DaemonEnv {
  95. /// Record an error, with a stripped version of the state of this instance.
  96. /// Use macro `check_or_record` to auto populate the environment.
  97. pub fn record_error<T, S: AsRef<str>>(
  98. &self,
  99. error_kind: ErrorKind,
  100. message: S,
  101. raft_state: &RaftState<T>,
  102. file_line: &'static str,
  103. ) {
  104. self.data.lock().errors.push(Error {
  105. error_kind,
  106. message: message.as_ref().into(),
  107. raft_state: Self::strip_data(raft_state),
  108. file_line,
  109. })
  110. }
  111. /// Register a daemon thread to make sure it is correctly shutdown when the
  112. /// Raft instance is killed.
  113. pub fn watch_daemon<F, T>(&self, daemon: Daemon, func: F)
  114. where
  115. F: FnOnce() -> T,
  116. F: Send + 'static,
  117. T: Send + 'static,
  118. {
  119. let thread_env = self.for_thread();
  120. let stop_wait_group = self
  121. .stop_wait_group
  122. .clone()
  123. .expect("Expecting a valid stop wait group when creating daemons");
  124. let thread = std::thread::Builder::new()
  125. .name(format!("ruaft-daemon-{:?}", daemon))
  126. .spawn(move || {
  127. thread_env.attach();
  128. func();
  129. ThreadEnv::detach();
  130. drop(stop_wait_group);
  131. })
  132. .expect("Creating daemon thread should never fail");
  133. self.data.lock().daemons.push((daemon, thread));
  134. }
  135. pub fn wait_for_daemons(&mut self) {
  136. if let Some(stop_wait_group) = self.stop_wait_group.take() {
  137. stop_wait_group.wait();
  138. } else {
  139. panic!("Daemons can only be waited once")
  140. }
  141. }
  142. /// Makes sure that all daemons have been shutdown, no more errors can be
  143. /// added, checks if any error has been added, or if any daemon panicked.
  144. pub fn shutdown(self) {
  145. let data = Arc::try_unwrap(self.data)
  146. .unwrap_or_else(|_| {
  147. panic!("No one should be holding daemon env at shutdown.")
  148. })
  149. .into_inner();
  150. let daemon_panics: Vec<String> = data
  151. .daemons
  152. .into_iter()
  153. .filter_map(|(daemon, join_handle)| {
  154. let err = join_handle.join().err()?;
  155. let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
  156. let err_string =
  157. err.downcast_ref::<String>().map(|s| s.as_str());
  158. let err =
  159. err_str.or(err_string).unwrap_or("unknown panic error");
  160. Some(format!("\nDaemon {:?} panicked: {}", daemon, err))
  161. })
  162. .collect();
  163. let recorded_errors: Vec<String> = data
  164. .errors
  165. .iter()
  166. .map(|error| format!("\n{:?}", error))
  167. .collect();
  168. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  169. // Do not panic again if we are cleaning up panicking threads.
  170. if std::thread::panicking() {
  171. eprintln!(
  172. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  173. daemon_panics.len(),
  174. daemon_panics.join(""),
  175. recorded_errors.len(),
  176. recorded_errors.join("")
  177. )
  178. } else {
  179. panic!(
  180. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  181. daemon_panics.len(),
  182. daemon_panics.join(""),
  183. recorded_errors.len(),
  184. recorded_errors.join("")
  185. )
  186. }
  187. }
  188. }
  189. fn strip_data<T>(raft: &RaftState<T>) -> StrippedRaftState {
  190. StrippedRaftState {
  191. current_term: raft.current_term,
  192. voted_for: raft.voted_for,
  193. log: raft.log.all_index_term(),
  194. commit_index: raft.commit_index,
  195. last_applied: raft.last_applied,
  196. state: raft.state,
  197. leader_id: raft.leader_id,
  198. }
  199. }
  200. }
  201. #[allow(dead_code)]
  202. #[derive(Debug)]
  203. struct StrippedRaftState {
  204. current_term: Term,
  205. voted_for: Option<Peer>,
  206. log: Vec<IndexTerm>,
  207. commit_index: usize,
  208. last_applied: usize,
  209. state: State,
  210. leader_id: Peer,
  211. }
  212. impl DaemonEnv {
  213. /// Creates a daemon environment. Each Raft instance should share the same
  214. /// environment. It should be added to any thread that executes Raft code.
  215. /// Use [`DaemonEnv::for_thread`] or [`DaemonEnv::for_scope`] to register
  216. /// the environment.
  217. pub fn create() -> Self {
  218. let data = Default::default();
  219. // Pre-create a template thread_env, so that we can clone the weak
  220. // pointer instead of downgrading frequently.
  221. let thread_env = ThreadEnv {
  222. data: Arc::downgrade(&data),
  223. #[cfg(all(not(test), feature = "integration-test"))]
  224. local_logger: thread_local_logger::get(),
  225. };
  226. Self {
  227. data,
  228. thread_env,
  229. stop_wait_group: Some(WaitGroup::new()),
  230. }
  231. }
  232. /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
  233. /// running in the thread can use this `DaemonEnv` to log errors. The thread
  234. /// must be stopped before `DaemonEnv::shutdown()` is called, otherwise it
  235. /// will panic when logging an error.
  236. pub fn for_thread(&self) -> ThreadEnv {
  237. self.thread_env.clone()
  238. }
  239. /// Creates a [`ThreadEnvGuard`] that registers this `DaemonEnv` in the
  240. /// current scope, which also remove it from the scope when dropped.
  241. pub fn for_scope(&self) -> ThreadEnvGuard {
  242. self.for_thread().attach();
  243. ThreadEnvGuard {}
  244. }
  245. }
  246. /// A weak reference to a [`DaemonEnv`] that is attached to a thread.
  247. /// Use [`ThreadEnv::attach`] to consume this instance and attach to a thread,
  248. /// and [`ThreadEnv::detach`] to undo that.
  249. #[derive(Clone, Debug, Default)]
  250. pub(crate) struct ThreadEnv {
  251. data: Weak<Mutex<DaemonEnvData>>,
  252. #[cfg(all(not(test), feature = "integration-test"))]
  253. local_logger: LocalLogger,
  254. }
  255. impl ThreadEnv {
  256. thread_local! {static ENV: RefCell<ThreadEnv> = Default::default()}
  257. /// Upgrade to the referenced [`DaemonEnv`].
  258. // The dance between Arc<> and Weak<> is complex, but useful:
  259. // 1) We do not have to worry about slow RPC threads causing
  260. // DaemonEnv::shutdown() to fail. They only hold a Weak<> pointer after all;
  261. // 2) We have one system that works both in the environments that we control
  262. // (daemon threads and our own thread pools), and in those we don't (RPC
  263. // handling methods);
  264. // 3) Utils (log_array, persister) can log errors without access to Raft;
  265. // 4) Because of 2), we do not need to expose DaemonEnv externally outside
  266. // this crate, even though there is a public macro referencing it.
  267. //
  268. // On the other hand, the cost is fairly small, because:
  269. // 1) Clone of weak is cheap: one branch plus one relaxed atomic load;
  270. // downgrade is more expensive, but we only do it once;
  271. // 2) Upgrade of weak is expensive, but that only happens when there is
  272. // an error, which should be (knock wood) rare;
  273. // 3) Set and unset a thread_local value is cheap, too.
  274. pub fn upgrade() -> DaemonEnv {
  275. let env = Self::ENV.with(|env| env.borrow().clone());
  276. DaemonEnv {
  277. data: env.data.upgrade().unwrap(),
  278. thread_env: env,
  279. stop_wait_group: None,
  280. }
  281. }
  282. /// Attach this instance to the current thread.
  283. pub fn attach(self) {
  284. #[cfg(all(not(test), feature = "integration-test"))]
  285. thread_local_logger::set(self.local_logger.clone());
  286. Self::ENV.with(|env| env.replace(self));
  287. }
  288. /// Detach the instance stored in the current thread.
  289. pub fn detach() {
  290. Self::ENV.with(|env| env.take());
  291. }
  292. }
  293. /// A guard that automatically cleans up the [`ThreadEnv`] attached to the
  294. /// current thread when dropped. It does not restore the previous value.
  295. pub(crate) struct ThreadEnvGuard {}
  296. impl Drop for ThreadEnvGuard {
  297. fn drop(&mut self) {
  298. ThreadEnv::detach()
  299. }
  300. }
  301. #[cfg(test)]
  302. mod tests {
  303. use super::*;
  304. fn assert_same_env(local_env: DaemonEnv, daemon_env: DaemonEnv) {
  305. assert!(Arc::ptr_eq(&local_env.data, &daemon_env.data));
  306. }
  307. #[test]
  308. fn test_for_thread() {
  309. let daemon_env = DaemonEnv::create();
  310. let thread_env = daemon_env.for_thread();
  311. let join_handle = std::thread::spawn(|| {
  312. thread_env.attach();
  313. let local_env = ThreadEnv::upgrade();
  314. ThreadEnv::detach();
  315. local_env
  316. });
  317. let local_env = join_handle
  318. .join()
  319. .expect("local env should be the same as daemon_env");
  320. assert_same_env(local_env, daemon_env);
  321. }
  322. #[test]
  323. fn test_for_scope() {
  324. let daemon_env = DaemonEnv::create();
  325. let local_env = {
  326. let _guard = daemon_env.for_scope();
  327. ThreadEnv::upgrade()
  328. };
  329. assert_same_env(local_env, daemon_env);
  330. // A weak pointer with weak_count == 0 is a null weak pointer.
  331. ThreadEnv::ENV
  332. .with(|env| assert_eq!(env.borrow().data.weak_count(), 0));
  333. }
  334. #[test]
  335. fn test_record_error() {
  336. let daemon_env = DaemonEnv::create();
  337. {
  338. let _guard = daemon_env.for_scope();
  339. let state = RaftState::<i32>::create(1, Peer(0));
  340. check_or_record!(
  341. 0 > 1,
  342. ErrorKind::SnapshotAfterLogEnd(1),
  343. "Just kidding",
  344. &state
  345. )
  346. }
  347. let guard = daemon_env.data.lock();
  348. let errors = &guard.errors;
  349. assert_eq!(errors.len(), 1);
  350. assert!(matches!(
  351. errors[0].error_kind,
  352. ErrorKind::SnapshotAfterLogEnd(1)
  353. ));
  354. assert_eq!(&errors[0].message, "Just kidding");
  355. }
  356. #[test]
  357. fn test_watch_daemon_shutdown() {
  358. let daemon_env = DaemonEnv::create();
  359. daemon_env.watch_daemon(Daemon::ApplyCommand, || {
  360. panic!("message with type &str");
  361. });
  362. daemon_env.watch_daemon(Daemon::Snapshot, || {
  363. panic!("message with type {:?}", "debug string");
  364. });
  365. let result = std::thread::spawn(move || {
  366. daemon_env.shutdown();
  367. })
  368. .join();
  369. let message = result.expect_err("shutdown should have panicked");
  370. let message = message
  371. .downcast_ref::<String>()
  372. .expect("Error message should be a string.");
  373. assert_eq!(
  374. message,
  375. "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
  376. );
  377. }
  378. }