daemon_env.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. use std::cell::RefCell;
  2. use std::sync::{Arc, Weak};
  3. use parking_lot::Mutex;
  4. use crate::index_term::IndexTerm;
  5. use crate::{Peer, RaftState, State, Term};
  6. /// A convenient macro to record errors.
  7. #[macro_export]
  8. macro_rules! check_or_record {
  9. ($condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  10. if !$condition {
  11. crate::daemon_env::ThreadEnv::upgrade().record_error(
  12. $error_kind,
  13. $message,
  14. $rf,
  15. concat!(file!(), ":", line!()),
  16. )
  17. }
  18. };
  19. }
  20. /// Environment for daemons.
  21. ///
  22. /// Each daemon thread should hold a copy of this struct, either directly or
  23. /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
  24. /// errors to a central location, which cause a failure at shutdown. It also
  25. /// checks daemon thread panics and collect information if they do.
  26. #[derive(Clone, Debug)]
  27. pub(crate) struct DaemonEnv {
  28. data: Arc<Mutex<DaemonEnvData>>,
  29. thread_env: ThreadEnv,
  30. }
  31. #[derive(Debug, Default)]
  32. struct DaemonEnvData {
  33. errors: Vec<Error>,
  34. daemons: Vec<std::thread::JoinHandle<()>>,
  35. }
  36. #[derive(Debug)]
  37. pub(crate) struct Error {
  38. error_kind: ErrorKind,
  39. message: String,
  40. raft_state: StrippedRaftState,
  41. file_line: &'static str,
  42. }
  43. #[derive(Debug)]
  44. pub(crate) enum ErrorKind {
  45. /// The leader sent log entries that do not match a committed log entry.
  46. /// It could be caused by a misbehaving leader, or that the leader should
  47. /// never have been elected, or that this Raft instance incorrectly moved
  48. /// its commit index.
  49. RollbackCommitted(usize),
  50. /// Similar to [`Self::RollbackCommitted`], but the leader sent a snapshot
  51. /// that is inconsistent with a committed log entry.
  52. SnapshotBeforeCommitted(usize, Term),
  53. /// The application sent a snapshot that contains items beyond the log end.
  54. SnapshotAfterLogEnd(usize),
  55. }
  56. impl DaemonEnv {
  57. /// Record an error, with a stripped version of the state of this instance.
  58. /// Use macro `check_or_record` to auto populate the environment.
  59. pub fn record_error<T, S: AsRef<str>>(
  60. &self,
  61. error_kind: ErrorKind,
  62. message: S,
  63. raft_state: &RaftState<T>,
  64. file_line: &'static str,
  65. ) {
  66. self.data.lock().errors.push(Error {
  67. error_kind,
  68. message: message.as_ref().into(),
  69. raft_state: Self::strip_data(raft_state),
  70. file_line,
  71. })
  72. }
  73. /// Register a daemon thread to make sure it is correctly shutdown when the
  74. /// Raft instance is killed.
  75. pub fn watch_daemon(&self, thread: std::thread::JoinHandle<()>) {
  76. self.data.lock().daemons.push(thread);
  77. }
  78. /// Makes sure that all daemons have been shutdown, no more errors can be
  79. /// added, checks if any error has been added, or if any daemon panicked.
  80. pub fn shutdown(self) {
  81. let data = Arc::try_unwrap(self.data)
  82. .unwrap_or_else(|_| {
  83. panic!("No one should be holding daemon env at shutdown.")
  84. })
  85. .into_inner();
  86. let daemon_panics: Vec<String> = data
  87. .daemons
  88. .into_iter()
  89. .filter_map(|join_handle| {
  90. let err = join_handle.join().err()?;
  91. let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
  92. let err_string =
  93. err.downcast_ref::<String>().map(|s| s.as_str());
  94. let err =
  95. err_str.or(err_string).unwrap_or("unknown panic error");
  96. Some("\n".to_owned() + err)
  97. })
  98. .collect();
  99. let recorded_errors: Vec<String> = data
  100. .errors
  101. .iter()
  102. .map(|error| format!("\n{:?}", error))
  103. .collect();
  104. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  105. // Do not panic again if we are cleaning up panicking threads.
  106. if std::thread::panicking() {
  107. eprintln!(
  108. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  109. daemon_panics.len(),
  110. daemon_panics.join(""),
  111. recorded_errors.len(),
  112. recorded_errors.join("")
  113. )
  114. } else {
  115. panic!(
  116. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  117. daemon_panics.len(),
  118. daemon_panics.join(""),
  119. recorded_errors.len(),
  120. recorded_errors.join("")
  121. )
  122. }
  123. }
  124. }
  125. fn strip_data<T>(raft: &RaftState<T>) -> StrippedRaftState {
  126. StrippedRaftState {
  127. current_term: raft.current_term,
  128. voted_for: raft.voted_for,
  129. log: raft.log.all().iter().map(|s| s.into()).collect(),
  130. commit_index: raft.commit_index,
  131. last_applied: raft.last_applied,
  132. state: raft.state,
  133. leader_id: raft.leader_id,
  134. }
  135. }
  136. }
  137. #[derive(Debug)]
  138. struct StrippedRaftState {
  139. current_term: Term,
  140. voted_for: Option<Peer>,
  141. log: Vec<IndexTerm>,
  142. commit_index: usize,
  143. last_applied: usize,
  144. state: State,
  145. leader_id: Peer,
  146. }
  147. impl DaemonEnv {
  148. /// Creates a daemon environment. Each Raft instance should share the same
  149. /// environment. It should be added to any thread that executes Raft code.
  150. /// Use [`DaemonEnv::for_thread`] or [`DaemonEnv::for_scope`] to register
  151. /// the environment.
  152. pub(crate) fn create() -> Self {
  153. let data = Default::default();
  154. // Pre-create a template thread_env, so that we can clone the weak
  155. // pointer instead of downgrading frequently.
  156. let thread_env = ThreadEnv {
  157. data: Arc::downgrade(&data),
  158. };
  159. Self { data, thread_env }
  160. }
  161. /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
  162. /// running in the thread can use this `DaemonEnv` to log errors. The thread
  163. /// must be stopped before `DaemonEnv::shutdown()` is called, otherwise it
  164. /// will panic when logging an error.
  165. pub(crate) fn for_thread(&self) -> ThreadEnv {
  166. self.thread_env.clone()
  167. }
  168. /// Creates a [`ThreadEnvGuard`] that registers this `DaemonEnv` in the
  169. /// current scope, which also remove it from the scope when dropped.
  170. pub(crate) fn for_scope(&self) -> ThreadEnvGuard {
  171. self.for_thread().attach();
  172. ThreadEnvGuard {}
  173. }
  174. }
  175. /// A weak reference to a [`DaemonEnv`] that is attached to a thread.
  176. /// Use [`ThreadEnv::attach`] to consume this instance and attach to a thread,
  177. /// and [`ThreadEnv::detach`] to undo that.
  178. #[derive(Clone, Debug, Default)]
  179. pub(crate) struct ThreadEnv {
  180. data: Weak<Mutex<DaemonEnvData>>,
  181. }
  182. impl ThreadEnv {
  183. thread_local! {static ENV: RefCell<ThreadEnv> = Default::default()}
  184. /// Upgrade to the referenced [`DaemonEvn`].
  185. // The dance between Arc<> and Weak<> is complex, but useful:
  186. // 1) We do not have to worry about slow RPC threads causing
  187. // DaemonEnv::shutdown() to fail. They only hold a Weak<> pointer after all;
  188. // 2) We have one system that works both in the environments that we control
  189. // (daemon threads and our own thread pools), and in those we don't (RPC
  190. // handling methods);
  191. // 3) Utils (log_array, persister) can log errors without access to Raft;
  192. // 4) Because of 2), we do not need to expose DaemonEnv externally outside
  193. // this crate, even though there is a public macro referencing it.
  194. //
  195. // On the other hand, the cost is fairly small, because:
  196. // 1) Clone of weak is cheap: one branch plus one relaxed atomic load;
  197. // downgrade is more expensive, but we only do it once;
  198. // 2) Upgrade of weak is expensive, but that only happens when there is
  199. // an error, which should be (knock wood) rare;
  200. // 3) Set and unset a thread_local value is cheap, too.
  201. pub fn upgrade() -> DaemonEnv {
  202. let env = Self::ENV.with(|env| env.borrow().clone());
  203. DaemonEnv {
  204. data: env.data.upgrade().unwrap(),
  205. thread_env: env,
  206. }
  207. }
  208. /// Attach this instance to the current thread.
  209. pub fn attach(self) {
  210. Self::ENV.with(|env| env.replace(self));
  211. }
  212. /// Detach the instance stored in the current thread.
  213. pub fn detach() {
  214. Self::ENV.with(|env| env.replace(Default::default()));
  215. }
  216. }
  217. /// A guard that automatically cleans up the [`ThreadEnv`] attached to the
  218. /// current thread when dropped. It does not restore the previous value.
  219. pub(crate) struct ThreadEnvGuard {}
  220. impl Drop for ThreadEnvGuard {
  221. fn drop(&mut self) {
  222. ThreadEnv::detach()
  223. }
  224. }
  225. // TODO(ditsing): add tests.