daemon_watch.rs 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. use crate::daemon_env::ThreadEnv;
  2. use crossbeam_utils::sync::WaitGroup;
  3. #[derive(Debug)]
  4. pub(crate) enum Daemon {
  5. Snapshot,
  6. ElectionTimer,
  7. SyncLogEntries,
  8. ApplyCommand,
  9. VerifyAuthority,
  10. }
  11. /// A guard for daemons.
  12. ///
  13. /// [`DaemonWatch`] manages daemon threads and makes sure that panics are
  14. /// recorded during shutdown. It collects daemon panics and send them to
  15. /// [`crate::DaemonEnv`].
  16. pub(crate) struct DaemonWatch {
  17. daemons: Vec<(Daemon, std::thread::JoinHandle<()>)>,
  18. thread_env: ThreadEnv,
  19. stop_wait_group: WaitGroup,
  20. }
  21. impl DaemonWatch {
  22. pub fn create(thread_env: ThreadEnv) -> Self {
  23. Self {
  24. daemons: vec![],
  25. thread_env,
  26. stop_wait_group: WaitGroup::new(),
  27. }
  28. }
  29. /// Register a daemon thread to make sure it is correctly shutdown when the
  30. /// Raft instance is killed.
  31. pub fn create_daemon<F, T>(&mut self, daemon: Daemon, func: F)
  32. where
  33. F: FnOnce() -> T,
  34. F: Send + 'static,
  35. T: Send + 'static,
  36. {
  37. let thread_env = self.thread_env.clone();
  38. let stop_wait_group = self.stop_wait_group.clone();
  39. let thread = std::thread::Builder::new()
  40. .name(format!("ruaft-daemon-{:?}", daemon))
  41. .spawn(move || {
  42. thread_env.attach();
  43. func();
  44. ThreadEnv::detach();
  45. drop(stop_wait_group);
  46. })
  47. .expect("Creating daemon thread should never fail");
  48. self.daemons.push((daemon, thread));
  49. }
  50. pub fn wait_for_daemons(self) {
  51. self.stop_wait_group.wait();
  52. self.thread_env.attach();
  53. for (daemon, join_handle) in self.daemons.into_iter() {
  54. if let Some(err) = join_handle.join().err() {
  55. let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
  56. let err_string =
  57. err.downcast_ref::<String>().map(|s| s.as_str());
  58. let err =
  59. err_str.or(err_string).unwrap_or("unknown panic error");
  60. ThreadEnv::upgrade().record_panic(daemon, err);
  61. }
  62. }
  63. ThreadEnv::detach();
  64. }
  65. }
  66. #[cfg(test)]
  67. mod tests {
  68. use super::*;
  69. use crate::daemon_env::DaemonEnv;
  70. #[test]
  71. fn test_watch_daemon_shutdown() {
  72. let daemon_env = DaemonEnv::create();
  73. let mut daemon_watch = DaemonWatch::create(daemon_env.for_thread());
  74. daemon_watch.create_daemon(Daemon::ApplyCommand, || {
  75. panic!("message with type &str");
  76. });
  77. daemon_watch.create_daemon(Daemon::Snapshot, || {
  78. panic!("message with type {:?}", "debug string");
  79. });
  80. daemon_watch.wait_for_daemons();
  81. let result = std::thread::spawn(move || {
  82. daemon_env.shutdown();
  83. })
  84. .join();
  85. let message = result.expect_err("shutdown should have panicked");
  86. let message = message
  87. .downcast_ref::<String>()
  88. .expect("Error message should be a string.");
  89. assert_eq!(
  90. message,
  91. "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
  92. );
  93. }
  94. }