daemon_env.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. use std::cell::RefCell;
  2. use std::sync::{Arc, Weak};
  3. use parking_lot::Mutex;
  4. #[cfg(all(not(test), feature = "integration-test"))]
  5. use test_utils::thread_local_logger::{self, LocalLogger};
  6. use crate::daemon_watch::Daemon;
  7. use crate::{IndexTerm, Peer, RaftState, State, Term};
  8. /// A convenient macro to record errors.
  9. #[macro_export]
  10. macro_rules! check_or_record {
  11. ($condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  12. if !$condition {
  13. $crate::daemon_env::ThreadEnv::upgrade().record_error(
  14. $error_kind,
  15. $message,
  16. $rf,
  17. concat!(file!(), ":", line!()),
  18. )
  19. }
  20. };
  21. }
  22. /// Environment for daemons.
  23. ///
  24. /// Each daemon thread should hold a copy of this struct, either directly or
  25. /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
  26. /// errors to a central location, which cause a failure at shutdown. It also
  27. /// collects daemon thread panics, which are supplied by [`crate::DaemonWatch`].
  28. #[derive(Clone, Debug)]
  29. pub(crate) struct DaemonEnv {
  30. data: Arc<Mutex<DaemonEnvData>>,
  31. thread_env: ThreadEnv,
  32. }
  33. #[derive(Debug, Default)]
  34. struct DaemonEnvData {
  35. errors: Vec<Error>,
  36. panics: Vec<String>,
  37. }
  38. #[allow(dead_code)]
  39. #[derive(Debug)]
  40. pub(crate) struct Error {
  41. error_kind: ErrorKind,
  42. message: String,
  43. raft_state: StrippedRaftState,
  44. file_line: &'static str,
  45. }
  46. #[allow(dead_code)]
  47. #[derive(Debug)]
  48. pub(crate) enum ErrorKind {
  49. /// The leader sent log entries that do not match a committed log entry.
  50. /// It could be caused by a misbehaving leader, or that the leader should
  51. /// never have been elected, or that this Raft instance incorrectly moved
  52. /// its commit index.
  53. RollbackCommitted(usize),
  54. /// Similar to [`Self::RollbackCommitted`], but the leader sent a snapshot
  55. /// that is inconsistent with a committed log entry.
  56. SnapshotBeforeCommitted(usize, Term),
  57. /// The application sent a snapshot that contains items beyond the log end.
  58. SnapshotAfterLogEnd(usize),
  59. /// The application sent a snapshot that contains items that are not
  60. /// committed yet. Only committed items are sent to the application.
  61. SnapshotNotCommitted(usize),
  62. /// The recipient of the `InstallSnapshot` RPC should have been able to
  63. /// verify the term at index `.0` but did not. The index `.0` is after
  64. /// their commit index `.1`, and thus not yet committed or archived into a
  65. /// local snapshot. The recipient should still have the log entry at `.0`.
  66. RefusedSnapshotAfterCommitted(usize, usize),
  67. /// Similar to [`Self::RollbackCommitted`], but this error is logged by the
  68. /// leader after receiving a reply from a follower.
  69. DivergedBeforeCommitted(usize, usize),
  70. /// A follower committed a log entry that is different from the leader. An
  71. /// opportunistic check that looks for log mismatches, missing committed log
  72. /// entries or other corruptions.
  73. DivergedAtCommitted(usize),
  74. /// A follower received an AppendEntries RPC with a `prev_log_index` that
  75. /// is inconsistent with the index of entries included in the same RPC.
  76. AppendEntriesIndexMismatch(usize, Vec<IndexTerm>),
  77. /// A follower committed a log entry that is beyond the log end of the
  78. /// leader. An opportunistic check that looks for log mismatches, missing
  79. /// committed log entries or other corruptions.
  80. CommittedBeyondEnd(usize),
  81. /// When examining a sync log entry response from a follower, the leader
  82. /// noticed that a log entry that it sent out is no longer in its own log.
  83. LeaderLogShrunk(usize),
  84. }
  85. impl DaemonEnv {
  86. /// Record an error, with a stripped version of the state of this instance.
  87. /// Use macro `check_or_record` to auto populate the environment.
  88. pub fn record_error<T, S: AsRef<str>>(
  89. &self,
  90. error_kind: ErrorKind,
  91. message: S,
  92. raft_state: &RaftState<T>,
  93. file_line: &'static str,
  94. ) {
  95. self.data.lock().errors.push(Error {
  96. error_kind,
  97. message: message.as_ref().into(),
  98. raft_state: Self::strip_data(raft_state),
  99. file_line,
  100. })
  101. }
  102. pub fn record_panic(&self, daemon: Daemon, err: &str) {
  103. self.data
  104. .lock()
  105. .panics
  106. .push(format!("\nDaemon {:?} panicked: {}", daemon, err));
  107. }
  108. /// Makes sure that all daemons have been shutdown, no more errors can be
  109. /// added, checks if any error has been added, or if any daemon panicked.
  110. pub fn shutdown(self) {
  111. let data = Arc::try_unwrap(self.data)
  112. .unwrap_or_else(|_| {
  113. panic!("No one should be holding daemon env at shutdown.")
  114. })
  115. .into_inner();
  116. let daemon_panics: Vec<String> = data.panics;
  117. let recorded_errors: Vec<String> = data
  118. .errors
  119. .iter()
  120. .map(|error| format!("\n{:?}", error))
  121. .collect();
  122. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  123. // Do not panic again if we are cleaning up panicking threads.
  124. if std::thread::panicking() {
  125. eprintln!(
  126. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  127. daemon_panics.len(),
  128. daemon_panics.join(""),
  129. recorded_errors.len(),
  130. recorded_errors.join("")
  131. )
  132. } else {
  133. panic!(
  134. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  135. daemon_panics.len(),
  136. daemon_panics.join(""),
  137. recorded_errors.len(),
  138. recorded_errors.join("")
  139. )
  140. }
  141. }
  142. }
  143. fn strip_data<T>(raft: &RaftState<T>) -> StrippedRaftState {
  144. StrippedRaftState {
  145. current_term: raft.current_term,
  146. voted_for: raft.voted_for,
  147. log: raft.log.all_index_term(),
  148. commit_index: raft.commit_index,
  149. state: raft.state,
  150. leader_id: raft.leader_id,
  151. }
  152. }
  153. }
  154. #[allow(dead_code)]
  155. #[derive(Debug)]
  156. struct StrippedRaftState {
  157. current_term: Term,
  158. voted_for: Option<Peer>,
  159. log: Vec<IndexTerm>,
  160. commit_index: usize,
  161. state: State,
  162. leader_id: Peer,
  163. }
  164. impl DaemonEnv {
  165. /// Creates a daemon environment. Each Raft instance should share the same
  166. /// environment. It should be added to any thread that executes Raft code.
  167. /// Use [`DaemonEnv::for_thread`] or [`DaemonEnv::for_scope`] to register
  168. /// the environment.
  169. pub fn create() -> Self {
  170. let data = Default::default();
  171. // Pre-create a template thread_env, so that we can clone the weak
  172. // pointer instead of downgrading frequently.
  173. let thread_env = ThreadEnv {
  174. data: Arc::downgrade(&data),
  175. #[cfg(all(not(test), feature = "integration-test"))]
  176. local_logger: thread_local_logger::get(),
  177. };
  178. Self { data, thread_env }
  179. }
  180. /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
  181. /// running in the thread can use this `DaemonEnv` to log errors. The thread
  182. /// must be stopped before `DaemonEnv::shutdown()` is called, otherwise it
  183. /// will panic when logging an error.
  184. pub fn for_thread(&self) -> ThreadEnv {
  185. self.thread_env.clone()
  186. }
  187. /// Creates a [`ThreadEnvGuard`] that registers this `DaemonEnv` in the
  188. /// current scope, which also remove it from the scope when dropped.
  189. pub fn for_scope(&self) -> ThreadEnvGuard {
  190. self.for_thread().attach();
  191. ThreadEnvGuard {}
  192. }
  193. }
  194. /// A weak reference to a [`DaemonEnv`] that is attached to a thread.
  195. /// Use [`ThreadEnv::attach`] to consume this instance and attach to a thread,
  196. /// and [`ThreadEnv::detach`] to undo that.
  197. #[derive(Clone, Debug, Default)]
  198. pub(crate) struct ThreadEnv {
  199. data: Weak<Mutex<DaemonEnvData>>,
  200. #[cfg(all(not(test), feature = "integration-test"))]
  201. local_logger: LocalLogger,
  202. }
  203. impl ThreadEnv {
  204. thread_local! {static ENV: RefCell<ThreadEnv> = Default::default()}
  205. /// Upgrade to the referenced [`DaemonEnv`].
  206. // The dance between Arc<> and Weak<> is complex, but useful:
  207. // 1) We do not have to worry about slow RPC threads causing
  208. // DaemonEnv::shutdown() to fail. They only hold a Weak<> pointer after all;
  209. // 2) We have one system that works both in the environments that we control
  210. // (daemon threads and our own thread pools), and in those we don't (RPC
  211. // handling methods);
  212. // 3) Utils (log_array, persister) can log errors without access to Raft;
  213. // 4) Because of 2), we do not need to expose DaemonEnv externally outside
  214. // this crate, even though there is a public macro referencing it.
  215. //
  216. // On the other hand, the cost is fairly small, because:
  217. // 1) Clone of weak is cheap: one branch plus one relaxed atomic load;
  218. // downgrade is more expensive, but we only do it once;
  219. // 2) Upgrade of weak is expensive, but that only happens when there is
  220. // an error, which should be (knock wood) rare;
  221. // 3) Set and unset a thread_local value is cheap, too.
  222. pub fn upgrade() -> DaemonEnv {
  223. let env = Self::ENV.with(|env| env.borrow().clone());
  224. DaemonEnv {
  225. data: env.data.upgrade().unwrap(),
  226. thread_env: env,
  227. }
  228. }
  229. /// Attach this instance to the current thread.
  230. pub fn attach(self) {
  231. #[cfg(all(not(test), feature = "integration-test"))]
  232. thread_local_logger::set(self.local_logger.clone());
  233. Self::ENV.with(|env| env.replace(self));
  234. }
  235. /// Detach the instance stored in the current thread.
  236. pub fn detach() {
  237. Self::ENV.with(|env| env.take());
  238. }
  239. }
  240. /// A guard that automatically cleans up the [`ThreadEnv`] attached to the
  241. /// current thread when dropped. It does not restore the previous value.
  242. pub(crate) struct ThreadEnvGuard {}
  243. impl Drop for ThreadEnvGuard {
  244. fn drop(&mut self) {
  245. ThreadEnv::detach()
  246. }
  247. }
  248. #[cfg(test)]
  249. mod tests {
  250. use super::*;
  251. fn assert_same_env(local_env: DaemonEnv, daemon_env: DaemonEnv) {
  252. assert!(Arc::ptr_eq(&local_env.data, &daemon_env.data));
  253. }
  254. #[test]
  255. fn test_for_thread() {
  256. let daemon_env = DaemonEnv::create();
  257. let thread_env = daemon_env.for_thread();
  258. let join_handle = std::thread::spawn(|| {
  259. thread_env.attach();
  260. let local_env = ThreadEnv::upgrade();
  261. ThreadEnv::detach();
  262. local_env
  263. });
  264. let local_env = join_handle
  265. .join()
  266. .expect("local env should be the same as daemon_env");
  267. assert_same_env(local_env, daemon_env);
  268. }
  269. #[test]
  270. fn test_for_scope() {
  271. let daemon_env = DaemonEnv::create();
  272. let local_env = {
  273. let _guard = daemon_env.for_scope();
  274. ThreadEnv::upgrade()
  275. };
  276. assert_same_env(local_env, daemon_env);
  277. // A weak pointer with weak_count == 0 is a null weak pointer.
  278. ThreadEnv::ENV
  279. .with(|env| assert_eq!(env.borrow().data.weak_count(), 0));
  280. }
  281. #[test]
  282. fn test_record_error() {
  283. let daemon_env = DaemonEnv::create();
  284. {
  285. let _guard = daemon_env.for_scope();
  286. let state = RaftState::<i32>::create(1, Peer(0));
  287. check_or_record!(
  288. 0 > 1,
  289. ErrorKind::SnapshotAfterLogEnd(1),
  290. "Just kidding",
  291. &state
  292. )
  293. }
  294. let guard = daemon_env.data.lock();
  295. let errors = &guard.errors;
  296. assert_eq!(errors.len(), 1);
  297. assert!(matches!(
  298. errors[0].error_kind,
  299. ErrorKind::SnapshotAfterLogEnd(1)
  300. ));
  301. assert_eq!(&errors[0].message, "Just kidding");
  302. }
  303. #[test]
  304. fn test_record_panic() {
  305. let daemon_env = DaemonEnv::create();
  306. daemon_env.record_panic(Daemon::ApplyCommand, "message with type &str");
  307. daemon_env.record_panic(
  308. Daemon::Snapshot,
  309. format!("message with type {:?}", "debug string").as_str(),
  310. );
  311. let result = std::thread::spawn(move || {
  312. daemon_env.shutdown();
  313. })
  314. .join();
  315. let message = result.expect_err("shutdown should have panicked");
  316. let message = message
  317. .downcast_ref::<String>()
  318. .expect("Error message should be a string.");
  319. assert_eq!(
  320. message,
  321. "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
  322. );
  323. }
  324. }