daemon_env.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. use std::cell::RefCell;
  2. use std::sync::{Arc, Weak};
  3. use parking_lot::Mutex;
  4. use crate::index_term::IndexTerm;
  5. use crate::{Peer, RaftState, State, Term};
  6. /// A convenient macro to record errors.
  7. #[macro_export]
  8. macro_rules! check_or_record {
  9. ($condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  10. if !$condition {
  11. crate::daemon_env::ThreadEnv::upgrade().record_error(
  12. $error_kind,
  13. $message,
  14. $rf,
  15. concat!(file!(), ":", line!()),
  16. )
  17. }
  18. };
  19. }
  20. /// Environment for daemons.
  21. ///
  22. /// Each daemon thread should hold a copy of this struct, either directly or
  23. /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
  24. /// errors to a central location, which cause a failure at shutdown. It also
  25. /// checks daemon thread panics and collect information if they do.
  26. #[derive(Clone, Debug)]
  27. pub(crate) struct DaemonEnv {
  28. data: Arc<Mutex<DaemonEnvData>>,
  29. thread_env: ThreadEnv,
  30. }
  31. #[derive(Debug, Default)]
  32. struct DaemonEnvData {
  33. errors: Vec<Error>,
  34. daemons: Vec<std::thread::JoinHandle<()>>,
  35. }
  36. #[derive(Debug)]
  37. pub(crate) struct Error {
  38. error_kind: ErrorKind,
  39. message: String,
  40. raft_state: StrippedRaftState,
  41. file_line: &'static str,
  42. }
  43. #[derive(Debug)]
  44. pub(crate) enum ErrorKind {
  45. /// The leader sent log entries that do not match a committed log entry.
  46. /// It could be caused by a misbehaving leader, or that the leader should
  47. /// never have been elected, or that this Raft instance incorrectly moved
  48. /// its commit index.
  49. RollbackCommitted(usize),
  50. /// Similar to [`Self::RollbackCommitted`], but the leader sent a snapshot
  51. /// that is inconsistent with a committed log entry.
  52. SnapshotBeforeCommitted(usize, Term),
  53. /// The application sent a snapshot that contains items beyond the log end.
  54. SnapshotAfterLogEnd(usize),
  55. /// The application sent a snapshot that contains items that are not
  56. /// committed yet. Only committed items are sent to the application.
  57. SnapshotNotCommitted(usize),
  58. /// The recipient of [`crate::InstallSnapshot`] should have been able to
  59. /// verify the term at index `.0` but did not. The index `.0` is after
  60. /// their commit index `.1`, and thus not yet committed or archived into a
  61. /// local snapshot. The recipient should still have the log entry at `.0`.
  62. RefusedSnapshotAfterCommitted(usize, usize),
  63. /// Similar to [`Self::RollbackCommitted`], but this error is logged by the
  64. /// leader after receiving a reply from a follower.
  65. DivergedBeforeCommitted(usize, usize),
  66. /// A follower committed a log entry that is different from the leader. An
  67. /// opportunistic check that looks for log mismatches, missing committed log
  68. /// entries or other corruptions.
  69. DivergedAtCommitted(usize),
  70. }
  71. impl DaemonEnv {
  72. /// Record an error, with a stripped version of the state of this instance.
  73. /// Use macro `check_or_record` to auto populate the environment.
  74. pub fn record_error<T, S: AsRef<str>>(
  75. &self,
  76. error_kind: ErrorKind,
  77. message: S,
  78. raft_state: &RaftState<T>,
  79. file_line: &'static str,
  80. ) {
  81. self.data.lock().errors.push(Error {
  82. error_kind,
  83. message: message.as_ref().into(),
  84. raft_state: Self::strip_data(raft_state),
  85. file_line,
  86. })
  87. }
  88. /// Register a daemon thread to make sure it is correctly shutdown when the
  89. /// Raft instance is killed.
  90. pub fn watch_daemon(&self, thread: std::thread::JoinHandle<()>) {
  91. self.data.lock().daemons.push(thread);
  92. }
  93. /// Makes sure that all daemons have been shutdown, no more errors can be
  94. /// added, checks if any error has been added, or if any daemon panicked.
  95. pub fn shutdown(self) {
  96. let data = Arc::try_unwrap(self.data)
  97. .unwrap_or_else(|_| {
  98. panic!("No one should be holding daemon env at shutdown.")
  99. })
  100. .into_inner();
  101. let daemon_panics: Vec<String> = data
  102. .daemons
  103. .into_iter()
  104. .filter_map(|join_handle| {
  105. let err = join_handle.join().err()?;
  106. let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
  107. let err_string =
  108. err.downcast_ref::<String>().map(|s| s.as_str());
  109. let err =
  110. err_str.or(err_string).unwrap_or("unknown panic error");
  111. Some("\n".to_owned() + err)
  112. })
  113. .collect();
  114. let recorded_errors: Vec<String> = data
  115. .errors
  116. .iter()
  117. .map(|error| format!("\n{:?}", error))
  118. .collect();
  119. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  120. // Do not panic again if we are cleaning up panicking threads.
  121. if std::thread::panicking() {
  122. eprintln!(
  123. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  124. daemon_panics.len(),
  125. daemon_panics.join(""),
  126. recorded_errors.len(),
  127. recorded_errors.join("")
  128. )
  129. } else {
  130. panic!(
  131. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  132. daemon_panics.len(),
  133. daemon_panics.join(""),
  134. recorded_errors.len(),
  135. recorded_errors.join("")
  136. )
  137. }
  138. }
  139. }
  140. fn strip_data<T>(raft: &RaftState<T>) -> StrippedRaftState {
  141. StrippedRaftState {
  142. current_term: raft.current_term,
  143. voted_for: raft.voted_for,
  144. log: raft.log.all().iter().map(|s| s.into()).collect(),
  145. commit_index: raft.commit_index,
  146. last_applied: raft.last_applied,
  147. state: raft.state,
  148. leader_id: raft.leader_id,
  149. }
  150. }
  151. }
  152. #[derive(Debug)]
  153. struct StrippedRaftState {
  154. current_term: Term,
  155. voted_for: Option<Peer>,
  156. log: Vec<IndexTerm>,
  157. commit_index: usize,
  158. last_applied: usize,
  159. state: State,
  160. leader_id: Peer,
  161. }
  162. impl DaemonEnv {
  163. /// Creates a daemon environment. Each Raft instance should share the same
  164. /// environment. It should be added to any thread that executes Raft code.
  165. /// Use [`DaemonEnv::for_thread`] or [`DaemonEnv::for_scope`] to register
  166. /// the environment.
  167. pub fn create() -> Self {
  168. let data = Default::default();
  169. // Pre-create a template thread_env, so that we can clone the weak
  170. // pointer instead of downgrading frequently.
  171. let thread_env = ThreadEnv {
  172. data: Arc::downgrade(&data),
  173. };
  174. Self { data, thread_env }
  175. }
  176. /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
  177. /// running in the thread can use this `DaemonEnv` to log errors. The thread
  178. /// must be stopped before `DaemonEnv::shutdown()` is called, otherwise it
  179. /// will panic when logging an error.
  180. pub fn for_thread(&self) -> ThreadEnv {
  181. self.thread_env.clone()
  182. }
  183. /// Creates a [`ThreadEnvGuard`] that registers this `DaemonEnv` in the
  184. /// current scope, which also remove it from the scope when dropped.
  185. pub fn for_scope(&self) -> ThreadEnvGuard {
  186. self.for_thread().attach();
  187. ThreadEnvGuard {}
  188. }
  189. }
  190. /// A weak reference to a [`DaemonEnv`] that is attached to a thread.
  191. /// Use [`ThreadEnv::attach`] to consume this instance and attach to a thread,
  192. /// and [`ThreadEnv::detach`] to undo that.
  193. #[derive(Clone, Debug, Default)]
  194. pub(crate) struct ThreadEnv {
  195. data: Weak<Mutex<DaemonEnvData>>,
  196. }
  197. impl ThreadEnv {
  198. thread_local! {static ENV: RefCell<ThreadEnv> = Default::default()}
  199. /// Upgrade to the referenced [`DaemonEvn`].
  200. // The dance between Arc<> and Weak<> is complex, but useful:
  201. // 1) We do not have to worry about slow RPC threads causing
  202. // DaemonEnv::shutdown() to fail. They only hold a Weak<> pointer after all;
  203. // 2) We have one system that works both in the environments that we control
  204. // (daemon threads and our own thread pools), and in those we don't (RPC
  205. // handling methods);
  206. // 3) Utils (log_array, persister) can log errors without access to Raft;
  207. // 4) Because of 2), we do not need to expose DaemonEnv externally outside
  208. // this crate, even though there is a public macro referencing it.
  209. //
  210. // On the other hand, the cost is fairly small, because:
  211. // 1) Clone of weak is cheap: one branch plus one relaxed atomic load;
  212. // downgrade is more expensive, but we only do it once;
  213. // 2) Upgrade of weak is expensive, but that only happens when there is
  214. // an error, which should be (knock wood) rare;
  215. // 3) Set and unset a thread_local value is cheap, too.
  216. pub fn upgrade() -> DaemonEnv {
  217. let env = Self::ENV.with(|env| env.borrow().clone());
  218. DaemonEnv {
  219. data: env.data.upgrade().unwrap(),
  220. thread_env: env,
  221. }
  222. }
  223. /// Attach this instance to the current thread.
  224. pub fn attach(self) {
  225. Self::ENV.with(|env| env.replace(self));
  226. }
  227. /// Detach the instance stored in the current thread.
  228. pub fn detach() {
  229. Self::ENV.with(|env| env.take());
  230. }
  231. }
  232. /// A guard that automatically cleans up the [`ThreadEnv`] attached to the
  233. /// current thread when dropped. It does not restore the previous value.
  234. pub(crate) struct ThreadEnvGuard {}
  235. impl Drop for ThreadEnvGuard {
  236. fn drop(&mut self) {
  237. ThreadEnv::detach()
  238. }
  239. }
  240. #[cfg(test)]
  241. mod tests {
  242. use super::*;
  243. fn assert_same_env(local_env: DaemonEnv, daemon_env: DaemonEnv) {
  244. assert!(Arc::ptr_eq(&local_env.data, &daemon_env.data));
  245. }
  246. #[test]
  247. fn test_for_thread() {
  248. let daemon_env = DaemonEnv::create();
  249. let thread_env = daemon_env.for_thread();
  250. let join_handle = std::thread::spawn(|| {
  251. thread_env.attach();
  252. let local_env = ThreadEnv::upgrade();
  253. ThreadEnv::detach();
  254. local_env
  255. });
  256. let local_env = join_handle
  257. .join()
  258. .expect("local env should be the same as daemon_env");
  259. assert_same_env(local_env, daemon_env);
  260. }
  261. #[test]
  262. fn test_for_scope() {
  263. let daemon_env = DaemonEnv::create();
  264. let local_env = {
  265. let _gurad = daemon_env.for_scope();
  266. ThreadEnv::upgrade()
  267. };
  268. assert_same_env(local_env, daemon_env);
  269. // A weak pointer with weak_count == 0 is a null weak pointer.
  270. ThreadEnv::ENV
  271. .with(|env| assert_eq!(env.borrow().data.weak_count(), 0));
  272. }
  273. #[test]
  274. fn test_record_error() {
  275. let daemon_env = DaemonEnv::create();
  276. {
  277. let _guard = daemon_env.for_scope();
  278. let state = RaftState::<i32>::create(1, Peer(0));
  279. check_or_record!(
  280. 0 > 1,
  281. ErrorKind::SnapshotAfterLogEnd(1),
  282. "Just kidding",
  283. &state
  284. )
  285. }
  286. let guard = daemon_env.data.lock();
  287. let errors = &guard.errors;
  288. assert_eq!(errors.len(), 1);
  289. assert!(matches!(
  290. errors[0].error_kind,
  291. ErrorKind::SnapshotAfterLogEnd(1)
  292. ));
  293. assert_eq!(&errors[0].message, "Just kidding");
  294. }
  295. #[test]
  296. fn test_watch_daemon_shutdown() {
  297. let daemon_env = DaemonEnv::create();
  298. let panic_thread = std::thread::spawn(|| {
  299. panic!("message with type &str");
  300. });
  301. daemon_env.watch_daemon(panic_thread);
  302. let another_panic_thread = std::thread::spawn(|| {
  303. panic!("message with type {:?}", "debug string");
  304. });
  305. daemon_env.watch_daemon(another_panic_thread);
  306. let result = std::thread::spawn(move || {
  307. daemon_env.shutdown();
  308. })
  309. .join();
  310. let message = result.expect_err("shutdown should have panicked");
  311. let message = message
  312. .downcast_ref::<String>()
  313. .expect("Error message should be a string.");
  314. assert_eq!(
  315. message,
  316. "\n2 daemon panic(s):\nmessage with type &str\nmessage with type \"debug string\"\n0 error(s):\n"
  317. );
  318. }
  319. }