daemon_env.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. use std::marker::PhantomData;
  2. use std::sync::Arc;
  3. use parking_lot::Mutex;
  4. use crate::index_term::IndexTerm;
  5. use crate::{Peer, RaftState, State, Term};
  6. #[macro_export]
  7. macro_rules! check_or_record {
  8. ($daemon_env:expr, $condition:expr, $error_kind:expr, $message:expr, $rf:expr) => {
  9. if !$condition {
  10. $daemon_env.record_error(
  11. $error_kind,
  12. $message,
  13. $rf,
  14. concat!(file!(), ":", line!()),
  15. )
  16. }
  17. };
  18. }
  19. #[derive(Clone, Debug, Default)]
  20. pub(crate) struct DaemonEnv<T> {
  21. data: Arc<Mutex<DaemonEnvData<T>>>,
  22. }
  23. #[derive(Debug, Default)]
  24. struct DaemonEnvData<T> {
  25. errors: Vec<Error>,
  26. daemons: Vec<std::thread::JoinHandle<()>>,
  27. phantom: PhantomData<T>,
  28. }
  29. #[derive(Debug)]
  30. pub(crate) struct Error {
  31. error_kind: ErrorKind,
  32. message: String,
  33. raft_state: StrippedRaftState,
  34. file_line: &'static str,
  35. }
  36. #[derive(Debug)]
  37. pub(crate) enum ErrorKind {
  38. RollbackCommitted(usize),
  39. }
  40. impl<T> DaemonEnv<T> {
  41. pub fn record_error<S: AsRef<str>>(
  42. &self,
  43. error_kind: ErrorKind,
  44. message: S,
  45. raft_state: &RaftState<T>,
  46. file_line: &'static str,
  47. ) {
  48. self.data.lock().errors.push(Error {
  49. error_kind,
  50. message: message.as_ref().into(),
  51. raft_state: Self::strip_data(raft_state),
  52. file_line,
  53. })
  54. }
  55. pub fn watch_daemon(&self, thread: std::thread::JoinHandle<()>) {
  56. self.data.lock().daemons.push(thread);
  57. }
  58. pub fn shutdown(self) {
  59. let data = Arc::try_unwrap(self.data)
  60. .unwrap_or_else(|_| {
  61. panic!("No one should be holding daemon env at shutdown.")
  62. })
  63. .into_inner();
  64. let daemon_panics: Vec<String> = data
  65. .daemons
  66. .into_iter()
  67. .filter_map(|join_handle| {
  68. let err = join_handle.join().err()?;
  69. let err_str = err
  70. .downcast_ref::<&str>()
  71. .map_or("unknown panic error", |s| s.to_owned());
  72. Some("\n".to_owned() + err_str)
  73. })
  74. .collect();
  75. let recorded_errors: Vec<String> = data
  76. .errors
  77. .iter()
  78. .map(|error| format!("\n{:?}", error))
  79. .collect();
  80. if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
  81. // Do not panic again if we are cleaning up panicking threads.
  82. if std::thread::panicking() {
  83. eprintln!(
  84. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  85. daemon_panics.len(),
  86. daemon_panics.join(""),
  87. recorded_errors.len(),
  88. recorded_errors.join("")
  89. )
  90. } else {
  91. panic!(
  92. "\n{} daemon panic(s):{}\n{} error(s):{}\n",
  93. daemon_panics.len(),
  94. daemon_panics.join(""),
  95. recorded_errors.len(),
  96. recorded_errors.join("")
  97. )
  98. }
  99. }
  100. }
  101. fn strip_data(raft: &RaftState<T>) -> StrippedRaftState {
  102. StrippedRaftState {
  103. current_term: raft.current_term,
  104. voted_for: raft.voted_for,
  105. log: raft.log.all().iter().map(|s| s.into()).collect(),
  106. commit_index: raft.commit_index,
  107. last_applied: raft.last_applied,
  108. state: raft.state,
  109. leader_id: raft.leader_id,
  110. }
  111. }
  112. }
  113. #[derive(Debug)]
  114. struct StrippedRaftState {
  115. current_term: Term,
  116. voted_for: Option<Peer>,
  117. log: Vec<IndexTerm>,
  118. commit_index: usize,
  119. last_applied: usize,
  120. state: State,
  121. leader_id: Peer,
  122. }