Explorar el Código

Name each daemon thread at creation.

Jing Yang hace 3 años
padre
commit
56e5b19355
Se han modificado 6 ficheros con 31 adiciones y 24 borrados
  1. 3 3
      src/apply_command.rs
  2. 14 9
      src/daemon_env.rs
  3. 4 3
      src/election.rs
  4. 4 3
      src/snapshot.rs
  5. 3 3
      src/sync_log_entries.rs
  6. 3 3
      src/verify_authority.rs

+ 3 - 3
src/apply_command.rs

@@ -57,7 +57,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let snapshot_daemon = self.snapshot_daemon.clone();
         let daemon_env = self.daemon_env.clone();
         let stop_wait_group = self.stop_wait_group.clone();
-        let join_handle = std::thread::spawn(move || {
+        let apply_command_daemon = move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = daemon_env.for_scope();
             log::info!("{:?} apply command daemon running ...", me);
@@ -116,8 +116,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
             log::info!("{:?} apply command daemon done.", me);
 
             drop(stop_wait_group);
-        });
+        };
         self.daemon_env
-            .watch_daemon(Daemon::ApplyCommand, join_handle);
+            .watch_daemon(Daemon::ApplyCommand, apply_command_daemon);
     }
 }

+ 14 - 9
src/daemon_env.rs

@@ -119,11 +119,18 @@ impl DaemonEnv {
 
     /// Register a daemon thread to make sure it is correctly shutdown when the
     /// Raft instance is killed.
-    pub fn watch_daemon(
-        &self,
-        daemon: Daemon,
-        thread: std::thread::JoinHandle<()>,
-    ) {
+    pub fn watch_daemon<F, T>(&self, daemon: Daemon, func: F)
+    where
+        F: FnOnce() -> T,
+        F: Send + 'static,
+        T: Send + 'static,
+    {
+        let thread = std::thread::Builder::new()
+            .name(format!("ruaft-daemon-{:?}", daemon))
+            .spawn(move || {
+                func();
+            })
+            .expect("Creating daemon thread should never fail");
         self.data.lock().daemons.push((daemon, thread));
     }
 
@@ -358,14 +365,12 @@ mod tests {
     #[test]
     fn test_watch_daemon_shutdown() {
         let daemon_env = DaemonEnv::create();
-        let panic_thread = std::thread::spawn(|| {
+        daemon_env.watch_daemon(Daemon::ApplyCommand, || {
             panic!("message with type &str");
         });
-        daemon_env.watch_daemon(Daemon::ApplyCommand, panic_thread);
-        let another_panic_thread = std::thread::spawn(|| {
+        daemon_env.watch_daemon(Daemon::Snapshot, || {
             panic!("message with type {:?}", "debug string");
         });
-        daemon_env.watch_daemon(Daemon::Snapshot, another_panic_thread);
 
         let result = std::thread::spawn(move || {
             daemon_env.shutdown();

+ 4 - 3
src/election.rs

@@ -120,7 +120,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// eventually realize the term they were competing for has passed and quit.
     pub(crate) fn run_election_timer(&self) {
         let this = self.clone();
-        let join_handle = std::thread::spawn(move || {
+
+        let election_daemon = move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = this.daemon_env.for_scope();
             log::info!("{:?} election timer daemon running ...", this.me);
@@ -210,9 +211,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);
             drop(stop_wait_group);
-        });
+        };
         self.daemon_env
-            .watch_daemon(Daemon::ElectionTimer, join_handle);
+            .watch_daemon(Daemon::ElectionTimer, election_daemon);
     }
 
     fn run_election(

+ 4 - 3
src/snapshot.rs

@@ -146,7 +146,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         let stop_wait_group = self.stop_wait_group.clone();
 
         log::info!("{:?} snapshot daemon running ...", me);
-        let join_handle = std::thread::spawn(move || loop {
+        let snapshot_daemon = move || loop {
             // Note: do not change this to `let _ = ...`.
             let _guard = daemon_env.for_scope();
 
@@ -223,7 +223,8 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                     rf.log.snapshot().1,
                 );
             }
-        });
-        self.daemon_env.watch_daemon(Daemon::Snapshot, join_handle);
+        };
+        self.daemon_env
+            .watch_daemon(Daemon::Snapshot, snapshot_daemon);
     }
 }

+ 3 - 3
src/sync_log_entries.rs

@@ -61,7 +61,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
         // Clone everything that the thread needs.
         let this = self.clone();
-        let join_handle = std::thread::spawn(move || {
+        let sync_log_entry_daemon = move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = this.daemon_env.for_scope();
 
@@ -110,9 +110,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);
             drop(stop_wait_group);
-        });
+        };
         self.daemon_env
-            .watch_daemon(Daemon::SyncLogEntries, join_handle);
+            .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
     }
 
     /// Syncs log entries to a peer once, requests a new sync if that fails.

+ 3 - 3
src/verify_authority.rs

@@ -380,7 +380,7 @@ impl<Command: 'static + Send> Raft<Command> {
         let rf = self.inner_state.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
-        let join_handle = std::thread::spawn(move || {
+        let verify_authority_daemon = move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = daemon_env.for_scope();
 
@@ -400,9 +400,9 @@ impl<Command: 'static + Send> Raft<Command> {
             log::info!("{:?} verify authority daemon done.", me);
 
             drop(stop_wait_group);
-        });
+        };
         self.daemon_env
-            .watch_daemon(Daemon::VerifyAuthority, join_handle);
+            .watch_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
     }
 
     /// Create a verify authority request. Returns None if we are not the