Jelajahi Sumber

Watch thread panics and record errors.

At shutdown, join all daemon thread handlers and make sure they did
not panic. Also make sure there is nothing unexpected during the whole
runtime. More assertions will be added in the future.
Jing Yang 4 tahun lalu
induk
melakukan
d28dc76eb5
6 mengubah file dengan 181 tambahan dan 14 penghapusan
  1. 4 3
      src/apply_command.rs
  2. 155 0
      src/daemon_env.rs
  3. 19 8
      src/lib.rs
  4. 0 1
      src/log_array.rs
  5. 1 1
      src/raft_state.rs
  6. 2 1
      src/snapshot.rs

+ 4 - 3
src/apply_command.rs

@@ -25,13 +25,13 @@ where
     pub(crate) fn run_apply_command_daemon(
         &self,
         mut apply_command: impl ApplyCommandFnMut<Command>,
-    ) -> std::thread::JoinHandle<()> {
+    ) {
         let keep_running = self.keep_running.clone();
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
         let stop_wait_group = self.stop_wait_group.clone();
-        std::thread::spawn(move || {
+        let join_handle = std::thread::spawn(move || {
             while keep_running.load(Ordering::SeqCst) {
                 let messages = {
                     let mut rf = rf.lock();
@@ -79,6 +79,7 @@ where
             }
 
             drop(stop_wait_group);
-        })
+        });
+        self.daemon_env.watch_daemon(join_handle);
     }
 }

+ 155 - 0
src/daemon_env.rs

@@ -0,0 +1,155 @@
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+use parking_lot::Mutex;
+
+use crate::index_term::IndexTerm;
+use crate::{Peer, RaftState, State, Term};
+
+#[macro_export]
+macro_rules! check_or_record {
+    ($daemon_env:expr, $condition:expr, $component:expr, $error_kind:expr, $message:expr, $rf:expr) => {
+        if !$condition {
+            $daemon_env.record_error(
+                $component,
+                $error_kind,
+                $message,
+                $rf,
+                concat!(file!(), ":", line!()),
+            )
+        }
+    };
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DaemonEnv<T> {
+    data: Arc<Mutex<DaemonEnvData<T>>>,
+}
+
+#[derive(Debug, Default)]
+struct DaemonEnvData<T> {
+    errors: Vec<Error>,
+    daemons: Vec<std::thread::JoinHandle<()>>,
+    phantom: PhantomData<T>,
+}
+
+#[derive(Debug)]
+pub(crate) struct Error {
+    component: Component,
+    error_kind: ErrorKind,
+    message: String,
+    raft_state: StrippedRaftState,
+    file_line: &'static str,
+}
+
+#[derive(Debug)]
+pub(crate) enum ErrorKind {
+    RollbackCommitted(usize),
+}
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub(crate) enum Component {
+    // Daemon threads.
+    Election,
+    SyncLogEntry,
+    ApplyCommand,
+    Snapshot,
+    // Daemon tasks
+    VoteCountingTask,
+    SyncLogEntryTask,
+    // RPC handlers
+    InstallSnapshot,
+    AppendEntries,
+    RequestVote,
+}
+
+impl<T> DaemonEnv<T> {
+    pub fn record_error<S: AsRef<str>>(
+        &self,
+        component: Component,
+        error_kind: ErrorKind,
+        message: S,
+        raft_state: &RaftState<T>,
+        file_line: &'static str,
+    ) {
+        self.data.lock().errors.push(Error {
+            component,
+            error_kind,
+            message: message.as_ref().into(),
+            raft_state: Self::strip_data(raft_state),
+            file_line,
+        })
+    }
+
+    pub fn watch_daemon(&self, thread: std::thread::JoinHandle<()>) {
+        self.data.lock().daemons.push(thread);
+    }
+
+    pub fn shutdown(self) {
+        let data = Arc::try_unwrap(self.data)
+            .unwrap_or_else(|_| {
+                panic!("No one should be holding daemon env at shutdown.")
+            })
+            .into_inner();
+        let daemon_panics: Vec<String> = data
+            .daemons
+            .into_iter()
+            .filter_map(|join_handle| {
+                let err = join_handle.join().err()?;
+                let err_str = err
+                    .downcast_ref::<&str>()
+                    .map_or("unknown panic error", |s| s.to_owned());
+                Some("\n".to_owned() + err_str)
+            })
+            .collect();
+        let recorded_errors: Vec<String> = data
+            .errors
+            .iter()
+            .map(|error| format!("\n{:?}", error))
+            .collect();
+        if !daemon_panics.is_empty() || !recorded_errors.is_empty() {
+            // Do not panic again if we are cleaning up panicking threads.
+            if std::thread::panicking() {
+                eprintln!(
+                    "\n{} daemon panic(s):{}\n{} error(s):{}\n",
+                    daemon_panics.len(),
+                    daemon_panics.join(""),
+                    recorded_errors.len(),
+                    recorded_errors.join("")
+                )
+            } else {
+                panic!(
+                    "\n{} daemon panic(s):{}\n{} error(s):{}\n",
+                    daemon_panics.len(),
+                    daemon_panics.join(""),
+                    recorded_errors.len(),
+                    recorded_errors.join("")
+                )
+            }
+        }
+    }
+
+    fn strip_data(raft: &RaftState<T>) -> StrippedRaftState {
+        StrippedRaftState {
+            current_term: raft.current_term,
+            voted_for: raft.voted_for,
+            log: raft.log.all().iter().map(|s| s.into()).collect(),
+            commit_index: raft.commit_index,
+            last_applied: raft.last_applied,
+            state: raft.state,
+            leader_id: raft.leader_id,
+        }
+    }
+}
+
+#[derive(Debug)]
+struct StrippedRaftState {
+    current_term: Term,
+    voted_for: Option<Peer>,
+    log: Vec<IndexTerm>,
+    commit_index: usize,
+    last_applied: usize,
+    state: State,
+    leader_id: Peer,
+}

+ 19 - 8
src/lib.rs

@@ -18,6 +18,7 @@ use rand::{thread_rng, Rng};
 
 use crate::apply_command::ApplyCommandFnMut;
 pub use crate::apply_command::ApplyCommandMessage;
+use crate::daemon_env::{Component, DaemonEnv, ErrorKind};
 use crate::index_term::IndexTerm;
 use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
@@ -30,6 +31,7 @@ use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::utils::retry_rpc;
 
 mod apply_command;
+mod daemon_env;
 mod index_term;
 mod install_snapshot;
 mod log_array;
@@ -79,6 +81,7 @@ pub struct Raft<Command> {
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 
+    daemon_env: DaemonEnv<Command>,
     stop_wait_group: WaitGroup,
 }
 
@@ -191,6 +194,7 @@ where
             election: Arc::new(election),
             snapshot_daemon: Default::default(),
             thread_pool: Arc::new(thread_pool),
+            daemon_env: Default::default(),
             stop_wait_group: WaitGroup::new(),
         };
 
@@ -308,9 +312,13 @@ where
             let index = i + args.prev_log_index + 1;
             if rf.log.end() > index {
                 if rf.log[index].term != entry.term {
-                    assert!(
+                    check_or_record!(
+                        self.daemon_env,
                         index > rf.commit_index,
-                        "Entries before commit index should never be rolled back"
+                        Component::AppendEntries,
+                        ErrorKind::RollbackCommitted(index),
+                        "Entries before commit index should never be rolled back",
+                        &rf
                     );
                     rf.log.truncate(index);
                     rf.log.push(entry.clone());
@@ -363,9 +371,9 @@ impl<Command> Raft<Command>
 where
     Command: 'static + Clone + Send + serde::Serialize + Default,
 {
-    fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
+    fn run_election_timer(&self) {
         let this = self.clone();
-        std::thread::spawn(move || {
+        let join_handle = std::thread::spawn(move || {
             let election = this.election.clone();
 
             let mut should_run = None;
@@ -435,7 +443,8 @@ where
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);
             drop(stop_wait_group);
-        })
+        });
+        self.daemon_env.watch_daemon(join_handle);
     }
 
     fn run_election(
@@ -660,13 +669,13 @@ where
         Ok(())
     }
 
-    fn run_log_entry_daemon(&mut self) -> std::thread::JoinHandle<()> {
+    fn run_log_entry_daemon(&mut self) {
         let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
         self.new_log_entry.replace(tx);
 
         // Clone everything that the thread needs.
         let this = self.clone();
-        std::thread::spawn(move || {
+        let join_handle = std::thread::spawn(move || {
             let mut openings = vec![];
             openings.resize_with(this.peers.len(), || {
                 Opening(Arc::new(AtomicUsize::new(0)))
@@ -703,7 +712,8 @@ where
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);
             drop(stop_wait_group);
-        })
+        });
+        self.daemon_env.watch_daemon(join_handle);
     }
 
     async fn sync_log_entry(
@@ -932,6 +942,7 @@ where
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
         self.stop_wait_group.wait();
+        self.daemon_env.shutdown();
         std::sync::Arc::try_unwrap(self.thread_pool)
             .expect(
                 "All references to the thread pool should have been dropped.",

+ 0 - 1
src/log_array.rs

@@ -96,7 +96,6 @@ impl<C> LogArray<C> {
     }
 
     /// All log entries stored in the array.
-    #[cfg(test)]
     pub fn all(&self) -> &[LogEntry<C>] {
         &self.inner[..]
     }

+ 1 - 1
src/raft_state.rs

@@ -2,7 +2,7 @@ use crate::{
     log_array::LogArray, persister::PersistedRaftState, Index, Peer, Term,
 };
 
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
 pub(crate) enum State {
     Follower,
     Candidate,

+ 2 - 1
src/snapshot.rs

@@ -80,7 +80,7 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
         let snapshot_daemon = self.snapshot_daemon.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
-        std::thread::spawn(move || loop {
+        let join_handle = std::thread::spawn(move || loop {
             parker.park();
             if !keep_running.load(Ordering::SeqCst) {
                 // Explicitly drop every thing.
@@ -127,5 +127,6 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                 );
             }
         });
+        self.daemon_env.watch_daemon(join_handle);
     }
 }