瀏覽代碼

Give each daemon a name.

Jing Yang 4 年之前
父節點
當前提交
5e75abc074
共有 5 個文件被更改,包括 31 次插入14 次删除
  1. 3 1
      src/apply_command.rs
  2. 20 8
      src/daemon_env.rs
  3. 3 1
      src/election.rs
  4. 2 2
      src/snapshot.rs
  5. 3 2
      src/sync_log_entries.rs

+ 3 - 1
src/apply_command.rs

@@ -1,6 +1,7 @@
 use std::sync::atomic::Ordering;
 use std::time::Duration;
 
+use crate::daemon_env::Daemon;
 use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
 
 pub enum ApplyCommandMessage<Command> {
@@ -114,6 +115,7 @@ where
 
             drop(stop_wait_group);
         });
-        self.daemon_env.watch_daemon(join_handle);
+        self.daemon_env
+            .watch_daemon(Daemon::ApplyCommand, join_handle);
     }
 }

+ 20 - 8
src/daemon_env.rs

@@ -33,10 +33,18 @@ pub(crate) struct DaemonEnv {
     thread_env: ThreadEnv,
 }
 
+#[derive(Debug)]
+pub(crate) enum Daemon {
+    Snapshot,
+    ElectionTimer,
+    SyncLogEntries,
+    ApplyCommand,
+}
+
 #[derive(Debug, Default)]
 struct DaemonEnvData {
     errors: Vec<Error>,
-    daemons: Vec<std::thread::JoinHandle<()>>,
+    daemons: Vec<(Daemon, std::thread::JoinHandle<()>)>,
 }
 
 #[derive(Debug)]
@@ -106,8 +114,12 @@ impl DaemonEnv {
 
     /// Register a daemon thread to make sure it is correctly shutdown when the
     /// Raft instance is killed.
-    pub fn watch_daemon(&self, thread: std::thread::JoinHandle<()>) {
-        self.data.lock().daemons.push(thread);
+    pub fn watch_daemon(
+        &self,
+        daemon: Daemon,
+        thread: std::thread::JoinHandle<()>,
+    ) {
+        self.data.lock().daemons.push((daemon, thread));
     }
 
     /// Makes sure that all daemons have been shutdown, no more errors can be
@@ -121,14 +133,14 @@ impl DaemonEnv {
         let daemon_panics: Vec<String> = data
             .daemons
             .into_iter()
-            .filter_map(|join_handle| {
+            .filter_map(|(daemon, join_handle)| {
                 let err = join_handle.join().err()?;
                 let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
                 let err_string =
                     err.downcast_ref::<String>().map(|s| s.as_str());
                 let err =
                     err_str.or(err_string).unwrap_or("unknown panic error");
-                Some("\n".to_owned() + err)
+                Some(format!("\nDaemon {:?} panicked: {}", daemon, err))
             })
             .collect();
         let recorded_errors: Vec<String> = data
@@ -336,11 +348,11 @@ mod tests {
         let panic_thread = std::thread::spawn(|| {
             panic!("message with type &str");
         });
-        daemon_env.watch_daemon(panic_thread);
+        daemon_env.watch_daemon(Daemon::ApplyCommand, panic_thread);
         let another_panic_thread = std::thread::spawn(|| {
             panic!("message with type {:?}", "debug string");
         });
-        daemon_env.watch_daemon(another_panic_thread);
+        daemon_env.watch_daemon(Daemon::Snapshot, another_panic_thread);
 
         let result = std::thread::spawn(move || {
             daemon_env.shutdown();
@@ -352,7 +364,7 @@ mod tests {
             .expect("Error message should be a string.");
         assert_eq!(
             message,
-            "\n2 daemon panic(s):\nmessage with type &str\nmessage with type \"debug string\"\n0 error(s):\n"
+            "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
         );
     }
 }

+ 3 - 1
src/election.rs

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
+use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::{Peer, Raft, RaftState, RequestVoteArgs, RpcClient, State, Term};
@@ -205,7 +206,8 @@ where
             drop(this);
             drop(stop_wait_group);
         });
-        self.daemon_env.watch_daemon(join_handle);
+        self.daemon_env
+            .watch_daemon(Daemon::ElectionTimer, join_handle);
     }
 
     fn run_election(

+ 2 - 2
src/snapshot.rs

@@ -5,7 +5,7 @@ use crossbeam_utils::sync::{Parker, Unparker};
 use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
-use crate::daemon_env::ErrorKind;
+use crate::daemon_env::{Daemon, ErrorKind};
 use crate::{Index, Raft};
 
 #[derive(Clone, Debug, Default)]
@@ -190,6 +190,6 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                 );
             }
         });
-        self.daemon_env.watch_daemon(join_handle);
+        self.daemon_env.watch_daemon(Daemon::Snapshot, join_handle);
     }
 }

+ 3 - 2
src/sync_log_entries.rs

@@ -5,7 +5,7 @@ use std::time::Duration;
 use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
-use crate::daemon_env::ErrorKind;
+use crate::daemon_env::{Daemon, ErrorKind};
 use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
@@ -104,7 +104,8 @@ where
             drop(this);
             drop(stop_wait_group);
         });
-        self.daemon_env.watch_daemon(join_handle);
+        self.daemon_env
+            .watch_daemon(Daemon::SyncLogEntries, join_handle);
     }
 
     /// Syncs log entries to a peer once, requests a new sync if that fails.