daemon_env.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. use std::cell::RefCell;
  2. use std::sync::{Arc, Weak};
  3. use parking_lot::Mutex;
  4. use crate::index_term::IndexTerm;
  5. use crate::{Peer, RaftState, State, Term};
  6. #[cfg(all(not(test), feature = "integration-test"))]
  7. use test_utils::thread_local_logger::{self, LocalLogger};
  8. /// A convenient macro to record errors.
  9. #[macro_export]
  10. macro_rules! check_or_record {
  11. ($condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  12. if !$condition {
  13. crate::daemon_env::ThreadEnv::upgrade().record_error(
  14. $error_kind,
  15. $message,
  16. $rf,
  17. concat!(file!(), ":", line!()),
  18. )
  19. }
  20. };
  21. }
  22. /// Environment for daemons.
  23. ///
  24. /// Each daemon thread should hold a copy of this struct, either directly or
  25. /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
  26. /// errors to a central location, which cause a failure at shutdown. It also
  27. /// checks daemon thread panics and collect information if they do.
  28. #[derive(Clone, Debug)]
  29. pub(crate) struct DaemonEnv {
  30. data: Arc<Mutex<DaemonEnvData>>,
  31. thread_env: ThreadEnv,
  32. }
  33. #[derive(Debug)]
  34. pub(crate) enum Daemon {
  35. Snapshot,
  36. ElectionTimer,
  37. SyncLogEntries,
  38. ApplyCommand,
  39. }
  40. #[derive(Debug, Default)]
  41. struct DaemonEnvData {
  42. errors: Vec<Error>,
  43. daemons: Vec<(Daemon, std::thread::JoinHandle<()>)>,
  44. }
  45. #[derive(Debug)]
  46. pub(crate) struct Error {
  47. error_kind: ErrorKind,
  48. message: String,
  49. raft_state: StrippedRaftState,
  50. file_line: &'static str,
  51. }
  52. #[derive(Debug)]
  53. pub(crate) enum ErrorKind {
  54. /// The leader sent log entries that do not match a committed log entry.
  55. /// It could be caused by a misbehaving leader, or that the leader should
  56. /// never have been elected, or that this Raft instance incorrectly moved
  57. /// its commit index.
  58. RollbackCommitted(usize),
  59. /// Similar to [`Self::RollbackCommitted`], but the leader sent a snapshot
  60. /// that is inconsistent with a committed log entry.
  61. SnapshotBeforeCommitted(usize, Term),
  62. /// The application sent a snapshot that contains items beyond the log end.
  63. SnapshotAfterLogEnd(usize),
  64. /// The application sent a snapshot that contains items that are not
  65. /// committed yet. Only committed items are sent to the application.
  66. SnapshotNotCommitted(usize),
  67. /// The recipient of the `InstallSnapshot` RPC should have been able to
  68. /// verify the term at index `.0` but did not. The index `.0` is after
  69. /// their commit index `.1`, and thus not yet committed or archived into a
  70. /// local snapshot. The recipient should still have the log entry at `.0`.
  71. RefusedSnapshotAfterCommitted(usize, usize),
  72. /// Similar to [`Self::RollbackCommitted`], but this error is logged by the
  73. /// leader after receiving a reply from a follower.
  74. DivergedBeforeCommitted(usize, usize),
  75. /// A follower committed a log entry that is different from the leader. An
  76. /// opportunistic check that looks for log mismatches, missing committed log
  77. /// entries or other corruptions.
  78. DivergedAtCommitted(usize),
  79. /// A follower received an AppendEntries RPC with a `prev_log_index` that
  80. /// is inconsistent with the index of entries included in the same RPC.
  81. AppendEntriesIndexMismatch(usize, Vec<IndexTerm>),
  82. /// A follower committed a log entry that is beyond the log end of the
  83. /// leader. An opportunistic check that looks for log mismatches, missing
  84. /// committed log entries or other corruptions.
  85. CommittedBeyondEnd(usize),
  86. /// When examining a sync log entry response from a follower, the leader
  87. /// noticed that a log entry that it sent out is no longer in its own log.
  88. LeaderLogShrunk(usize),
  89. }
  90. impl DaemonEnv {
  91. /// Record an error, with a stripped version of the state of this instance.
  92. /// Use macro `check_or_record` to auto populate the environment.
  93. pub fn record_error<T, S: AsRef<str>>(
  94. &self,
  95. error_kind: ErrorKind,
  96. message: S,
  97. raft_state: &RaftState<T>,
  98. file_line: &'static str,
  99. ) {
  100. self.data.lock().errors.push(Error {
  101. error_kind,
  102. message: message.as_ref().into(),
  103. raft_state: Self::strip_data(raft_state),
  104. file_line,
  105. })
  106. }
  107. /// Register a daemon thread to make sure it is correctly shutdown when the
  108. /// Raft instance is killed.
  109. pub fn watch_daemon(
  110. &self,
  111. daemon: Daemon,
  112. thread: std::thread::JoinHandle<()>,
  113. ) {
  114. self.data.lock().daemons.push((daemon, thread));
  115. }
  116. /// Makes sure that all daemons have been shutdown, no more errors can be
  117. /// added, checks if any error has been added, or if any daemon panicked.
  118. pub fn shutdown(self) {
  119. let data = Arc::try_unwrap(self.data)
  120. .unwrap_or_else(|_| {
  121. panic!("No one should be holding daemon env at shutdown.")
  122. })
  123. .into_inner();
  124. let daemon_panics: Vec<String> = data
  125. .daemons
  126. .into_iter()
  127. .filter_map(|(daemon, join_handle)| {
  128. let err = join_handle.join().err()?;
  129. let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
  130. let err_string =
  131. err.downcast_ref::<String>().map(|s| s.as_str());
  132. let err =
  133. err_str.or(err_string).unwrap_or("unknown panic error");
  134. Some(format!("\nDaemon {:?} panicked: {}", daemon, err))
  135. })
  136. .collect();
  137. let recorded_errors: Vec<String> = data
  138. .errors
  139. .iter()
  140. .map(|error| format!("\n{:?}", error))
  141. .collect();
  142. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  143. // Do not panic again if we are cleaning up panicking threads.
  144. if std::thread::panicking() {
  145. eprintln!(
  146. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  147. daemon_panics.len(),
  148. daemon_panics.join(""),
  149. recorded_errors.len(),
  150. recorded_errors.join("")
  151. )
  152. } else {
  153. panic!(
  154. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  155. daemon_panics.len(),
  156. daemon_panics.join(""),
  157. recorded_errors.len(),
  158. recorded_errors.join("")
  159. )
  160. }
  161. }
  162. }
  163. fn strip_data<T>(raft: &RaftState<T>) -> StrippedRaftState {
  164. StrippedRaftState {
  165. current_term: raft.current_term,
  166. voted_for: raft.voted_for,
  167. log: raft.log.all_index_term(),
  168. commit_index: raft.commit_index,
  169. last_applied: raft.last_applied,
  170. state: raft.state,
  171. leader_id: raft.leader_id,
  172. }
  173. }
  174. }
  175. #[derive(Debug)]
  176. struct StrippedRaftState {
  177. current_term: Term,
  178. voted_for: Option<Peer>,
  179. log: Vec<IndexTerm>,
  180. commit_index: usize,
  181. last_applied: usize,
  182. state: State,
  183. leader_id: Peer,
  184. }
  185. impl DaemonEnv {
  186. /// Creates a daemon environment. Each Raft instance should share the same
  187. /// environment. It should be added to any thread that executes Raft code.
  188. /// Use [`DaemonEnv::for_thread`] or [`DaemonEnv::for_scope`] to register
  189. /// the environment.
  190. pub fn create() -> Self {
  191. let data = Default::default();
  192. // Pre-create a template thread_env, so that we can clone the weak
  193. // pointer instead of downgrading frequently.
  194. let thread_env = ThreadEnv {
  195. data: Arc::downgrade(&data),
  196. #[cfg(all(not(test), feature = "integration-test"))]
  197. local_logger: thread_local_logger::get(),
  198. };
  199. Self { data, thread_env }
  200. }
  201. /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
  202. /// running in the thread can use this `DaemonEnv` to log errors. The thread
  203. /// must be stopped before `DaemonEnv::shutdown()` is called, otherwise it
  204. /// will panic when logging an error.
  205. pub fn for_thread(&self) -> ThreadEnv {
  206. self.thread_env.clone()
  207. }
  208. /// Creates a [`ThreadEnvGuard`] that registers this `DaemonEnv` in the
  209. /// current scope, which also remove it from the scope when dropped.
  210. pub fn for_scope(&self) -> ThreadEnvGuard {
  211. self.for_thread().attach();
  212. ThreadEnvGuard {}
  213. }
  214. }
  215. /// A weak reference to a [`DaemonEnv`] that is attached to a thread.
  216. /// Use [`ThreadEnv::attach`] to consume this instance and attach to a thread,
  217. /// and [`ThreadEnv::detach`] to undo that.
  218. #[derive(Clone, Debug, Default)]
  219. pub(crate) struct ThreadEnv {
  220. data: Weak<Mutex<DaemonEnvData>>,
  221. #[cfg(all(not(test), feature = "integration-test"))]
  222. local_logger: LocalLogger,
  223. }
  224. impl ThreadEnv {
  225. thread_local! {static ENV: RefCell<ThreadEnv> = Default::default()}
  226. /// Upgrade to the referenced [`DaemonEnv`].
  227. // The dance between Arc<> and Weak<> is complex, but useful:
  228. // 1) We do not have to worry about slow RPC threads causing
  229. // DaemonEnv::shutdown() to fail. They only hold a Weak<> pointer after all;
  230. // 2) We have one system that works both in the environments that we control
  231. // (daemon threads and our own thread pools), and in those we don't (RPC
  232. // handling methods);
  233. // 3) Utils (log_array, persister) can log errors without access to Raft;
  234. // 4) Because of 2), we do not need to expose DaemonEnv externally outside
  235. // this crate, even though there is a public macro referencing it.
  236. //
  237. // On the other hand, the cost is fairly small, because:
  238. // 1) Clone of weak is cheap: one branch plus one relaxed atomic load;
  239. // downgrade is more expensive, but we only do it once;
  240. // 2) Upgrade of weak is expensive, but that only happens when there is
  241. // an error, which should be (knock wood) rare;
  242. // 3) Set and unset a thread_local value is cheap, too.
  243. pub fn upgrade() -> DaemonEnv {
  244. let env = Self::ENV.with(|env| env.borrow().clone());
  245. DaemonEnv {
  246. data: env.data.upgrade().unwrap(),
  247. thread_env: env,
  248. }
  249. }
  250. /// Attach this instance to the current thread.
  251. pub fn attach(self) {
  252. #[cfg(all(not(test), feature = "integration-test"))]
  253. thread_local_logger::set(self.local_logger.clone());
  254. Self::ENV.with(|env| env.replace(self));
  255. }
  256. /// Detach the instance stored in the current thread.
  257. pub fn detach() {
  258. Self::ENV.with(|env| env.take());
  259. }
  260. }
  261. /// A guard that automatically cleans up the [`ThreadEnv`] attached to the
  262. /// current thread when dropped. It does not restore the previous value.
  263. pub(crate) struct ThreadEnvGuard {}
  264. impl Drop for ThreadEnvGuard {
  265. fn drop(&mut self) {
  266. ThreadEnv::detach()
  267. }
  268. }
  269. #[cfg(test)]
  270. mod tests {
  271. use super::*;
  272. fn assert_same_env(local_env: DaemonEnv, daemon_env: DaemonEnv) {
  273. assert!(Arc::ptr_eq(&local_env.data, &daemon_env.data));
  274. }
  275. #[test]
  276. fn test_for_thread() {
  277. let daemon_env = DaemonEnv::create();
  278. let thread_env = daemon_env.for_thread();
  279. let join_handle = std::thread::spawn(|| {
  280. thread_env.attach();
  281. let local_env = ThreadEnv::upgrade();
  282. ThreadEnv::detach();
  283. local_env
  284. });
  285. let local_env = join_handle
  286. .join()
  287. .expect("local env should be the same as daemon_env");
  288. assert_same_env(local_env, daemon_env);
  289. }
  290. #[test]
  291. fn test_for_scope() {
  292. let daemon_env = DaemonEnv::create();
  293. let local_env = {
  294. let _guard = daemon_env.for_scope();
  295. ThreadEnv::upgrade()
  296. };
  297. assert_same_env(local_env, daemon_env);
  298. // A weak pointer with weak_count == 0 is a null weak pointer.
  299. ThreadEnv::ENV
  300. .with(|env| assert_eq!(env.borrow().data.weak_count(), 0));
  301. }
  302. #[test]
  303. fn test_record_error() {
  304. let daemon_env = DaemonEnv::create();
  305. {
  306. let _guard = daemon_env.for_scope();
  307. let state = RaftState::<i32>::create(1, Peer(0));
  308. check_or_record!(
  309. 0 > 1,
  310. ErrorKind::SnapshotAfterLogEnd(1),
  311. "Just kidding",
  312. &state
  313. )
  314. }
  315. let guard = daemon_env.data.lock();
  316. let errors = &guard.errors;
  317. assert_eq!(errors.len(), 1);
  318. assert!(matches!(
  319. errors[0].error_kind,
  320. ErrorKind::SnapshotAfterLogEnd(1)
  321. ));
  322. assert_eq!(&errors[0].message, "Just kidding");
  323. }
  324. #[test]
  325. fn test_watch_daemon_shutdown() {
  326. let daemon_env = DaemonEnv::create();
  327. let panic_thread = std::thread::spawn(|| {
  328. panic!("message with type &str");
  329. });
  330. daemon_env.watch_daemon(Daemon::ApplyCommand, panic_thread);
  331. let another_panic_thread = std::thread::spawn(|| {
  332. panic!("message with type {:?}", "debug string");
  333. });
  334. daemon_env.watch_daemon(Daemon::Snapshot, another_panic_thread);
  335. let result = std::thread::spawn(move || {
  336. daemon_env.shutdown();
  337. })
  338. .join();
  339. let message = result.expect_err("shutdown should have panicked");
  340. let message = message
  341. .downcast_ref::<String>()
  342. .expect("Error message should be a string.");
  343. assert_eq!(
  344. message,
  345. "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
  346. );
  347. }
  348. }