Parcourir la source

Refactor: move DaemonWatch out of DaemonEnv.

Jing Yang il y a 3 ans
Parent
commit
5fe90f62fe
9 fichiers modifiés avec 149 ajouts et 94 suppressions
  1. 3 3
      src/apply_command.rs
  2. 16 70
      src/daemon_env.rs
  3. 108 0
      src/daemon_watch.rs
  4. 3 3
      src/election.rs
  5. 1 0
      src/lib.rs
  6. 7 9
      src/raft.rs
  7. 4 3
      src/snapshot.rs
  8. 4 3
      src/sync_log_entries.rs
  9. 3 3
      src/verify_authority.rs

+ 3 - 3
src/apply_command.rs

@@ -1,6 +1,6 @@
 use std::sync::atomic::Ordering;
 
-use crate::daemon_env::Daemon;
+use crate::daemon_watch::Daemon;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::{Index, Raft, ReplicableCommand, Snapshot};
 
@@ -111,7 +111,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             }
             log::info!("{:?} apply command daemon done.", me);
         };
-        self.daemon_env
-            .watch_daemon(Daemon::ApplyCommand, apply_command_daemon);
+        self.daemon_watch
+            .create_daemon(Daemon::ApplyCommand, apply_command_daemon);
     }
 }

+ 16 - 70
src/daemon_env.rs

@@ -1,12 +1,12 @@
 use std::cell::RefCell;
 use std::sync::{Arc, Weak};
 
-use crossbeam_utils::sync::WaitGroup;
 use parking_lot::Mutex;
 
 #[cfg(all(not(test), feature = "integration-test"))]
 use test_utils::thread_local_logger::{self, LocalLogger};
 
+use crate::daemon_watch::Daemon;
 use crate::{IndexTerm, Peer, RaftState, State, Term};
 
 /// A convenient macro to record errors.
@@ -29,27 +29,17 @@ macro_rules! check_or_record {
 /// Each daemon thread should hold a copy of this struct, either directly or
 /// through a copy of [`crate::Raft`]. It can be used for logging unexpected
 /// errors to a central location, which cause a failure at shutdown. It also
-/// checks daemon thread panics and collect information if they do.
+/// collects daemon thread panics, which are supplied by [`crate::DaemonWatch`].
 #[derive(Clone, Debug)]
 pub(crate) struct DaemonEnv {
     data: Arc<Mutex<DaemonEnvData>>,
     thread_env: ThreadEnv,
-    stop_wait_group: Option<WaitGroup>,
-}
-
-#[derive(Debug)]
-pub(crate) enum Daemon {
-    Snapshot,
-    ElectionTimer,
-    SyncLogEntries,
-    ApplyCommand,
-    VerifyAuthority,
 }
 
 #[derive(Debug, Default)]
 struct DaemonEnvData {
     errors: Vec<Error>,
-    daemons: Vec<(Daemon, std::thread::JoinHandle<()>)>,
+    panics: Vec<String>,
 }
 
 #[allow(dead_code)]
@@ -119,37 +109,11 @@ impl DaemonEnv {
         })
     }
 
-    /// Register a daemon thread to make sure it is correctly shutdown when the
-    /// Raft instance is killed.
-    pub fn watch_daemon<F, T>(&self, daemon: Daemon, func: F)
-    where
-        F: FnOnce() -> T,
-        F: Send + 'static,
-        T: Send + 'static,
-    {
-        let thread_env = self.for_thread();
-        let stop_wait_group = self
-            .stop_wait_group
-            .clone()
-            .expect("Expecting a valid stop wait group when creating daemons");
-        let thread = std::thread::Builder::new()
-            .name(format!("ruaft-daemon-{:?}", daemon))
-            .spawn(move || {
-                thread_env.attach();
-                func();
-                ThreadEnv::detach();
-                drop(stop_wait_group);
-            })
-            .expect("Creating daemon thread should never fail");
-        self.data.lock().daemons.push((daemon, thread));
-    }
-
-    pub fn wait_for_daemons(&mut self) {
-        if let Some(stop_wait_group) = self.stop_wait_group.take() {
-            stop_wait_group.wait();
-        } else {
-            panic!("Daemons can only be waited once")
-        }
+    pub fn record_panic(&self, daemon: Daemon, err: &str) {
+        self.data
+            .lock()
+            .panics
+            .push(format!("\nDaemon {:?} panicked: {}", daemon, err));
     }
 
     /// Makes sure that all daemons have been shutdown, no more errors can be
@@ -160,19 +124,7 @@ impl DaemonEnv {
                 panic!("No one should be holding daemon env at shutdown.")
             })
             .into_inner();
-        let daemon_panics: Vec<String> = data
-            .daemons
-            .into_iter()
-            .filter_map(|(daemon, join_handle)| {
-                let err = join_handle.join().err()?;
-                let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
-                let err_string =
-                    err.downcast_ref::<String>().map(|s| s.as_str());
-                let err =
-                    err_str.or(err_string).unwrap_or("unknown panic error");
-                Some(format!("\nDaemon {:?} panicked: {}", daemon, err))
-            })
-            .collect();
+        let daemon_panics: Vec<String> = data.panics;
         let recorded_errors: Vec<String> = data
             .errors
             .iter()
@@ -240,11 +192,7 @@ impl DaemonEnv {
             local_logger: thread_local_logger::get(),
         };
 
-        Self {
-            data,
-            thread_env,
-            stop_wait_group: Some(WaitGroup::new()),
-        }
+        Self { data, thread_env }
     }
 
     /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
@@ -298,7 +246,6 @@ impl ThreadEnv {
         DaemonEnv {
             data: env.data.upgrade().unwrap(),
             thread_env: env,
-            stop_wait_group: None,
         }
     }
 
@@ -387,14 +334,13 @@ mod tests {
     }
 
     #[test]
-    fn test_watch_daemon_shutdown() {
+    fn test_record_panic() {
         let daemon_env = DaemonEnv::create();
-        daemon_env.watch_daemon(Daemon::ApplyCommand, || {
-            panic!("message with type &str");
-        });
-        daemon_env.watch_daemon(Daemon::Snapshot, || {
-            panic!("message with type {:?}", "debug string");
-        });
+        daemon_env.record_panic(Daemon::ApplyCommand, "message with type &str");
+        daemon_env.record_panic(
+            Daemon::Snapshot,
+            format!("message with type {:?}", "debug string").as_str(),
+        );
 
         let result = std::thread::spawn(move || {
             daemon_env.shutdown();

+ 108 - 0
src/daemon_watch.rs

@@ -0,0 +1,108 @@
+use crate::daemon_env::ThreadEnv;
+use crossbeam_utils::sync::WaitGroup;
+use parking_lot::Mutex;
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub(crate) enum Daemon {
+    Snapshot,
+    ElectionTimer,
+    SyncLogEntries,
+    ApplyCommand,
+    VerifyAuthority,
+}
+
+/// A guard for daemons.
+///
+/// [`DaemonWatch`] manages daemon threads and makes sure that panics are
+/// recorded during shutdown. It collects daemon panics and send them to
+/// [`crate::DaemonEnv`].
+#[derive(Clone)]
+pub(crate) struct DaemonWatch {
+    #[allow(clippy::type_complexity)]
+    daemons: Arc<Mutex<Vec<(Daemon, std::thread::JoinHandle<()>)>>>,
+    thread_env: ThreadEnv,
+    stop_wait_group: WaitGroup,
+}
+
+impl DaemonWatch {
+    pub fn create(thread_env: ThreadEnv) -> Self {
+        Self {
+            daemons: Arc::new(Mutex::new(vec![])),
+            thread_env,
+            stop_wait_group: WaitGroup::new(),
+        }
+    }
+
+    /// Register a daemon thread to make sure it is correctly shutdown when the
+    /// Raft instance is killed.
+    pub fn create_daemon<F, T>(&self, daemon: Daemon, func: F)
+    where
+        F: FnOnce() -> T,
+        F: Send + 'static,
+        T: Send + 'static,
+    {
+        let thread_env = self.thread_env.clone();
+        let stop_wait_group = self.stop_wait_group.clone();
+        let thread = std::thread::Builder::new()
+            .name(format!("ruaft-daemon-{:?}", daemon))
+            .spawn(move || {
+                thread_env.attach();
+                func();
+                ThreadEnv::detach();
+                drop(stop_wait_group);
+            })
+            .expect("Creating daemon thread should never fail");
+        self.daemons.lock().push((daemon, thread));
+    }
+
+    pub fn wait_for_daemons(self) {
+        self.stop_wait_group.wait();
+        self.thread_env.attach();
+        for (daemon, join_handle) in self.daemons.lock().drain(..) {
+            if let Some(err) = join_handle.join().err() {
+                let err_str = err.downcast_ref::<&str>().map(|s| s.to_owned());
+                let err_string =
+                    err.downcast_ref::<String>().map(|s| s.as_str());
+                let err =
+                    err_str.or(err_string).unwrap_or("unknown panic error");
+                ThreadEnv::upgrade().record_panic(daemon, err);
+            }
+        }
+        ThreadEnv::detach();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::daemon_env::DaemonEnv;
+
+    #[test]
+    fn test_watch_daemon_shutdown() {
+        let daemon_env = DaemonEnv::create();
+        let daemon_watch = DaemonWatch::create(daemon_env.for_thread());
+        daemon_watch.create_daemon(Daemon::ApplyCommand, || {
+            panic!("message with type &str");
+        });
+
+        daemon_watch.create_daemon(Daemon::Snapshot, || {
+            panic!("message with type {:?}", "debug string");
+        });
+
+        daemon_watch.wait_for_daemons();
+
+        let result = std::thread::spawn(move || {
+            daemon_env.shutdown();
+        })
+        .join();
+        let message = result.expect_err("shutdown should have panicked");
+        let message = message
+            .downcast_ref::<String>()
+            .expect("Error message should be a string.");
+        assert_eq!(
+            message,
+            "\n2 daemon panic(s):\nDaemon ApplyCommand panicked: message with type &str\nDaemon Snapshot panicked: message with type \"debug string\"\n0 error(s):\n"
+        );
+    }
+}

+ 3 - 3
src/election.rs

@@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
-use crate::daemon_env::Daemon;
+use crate::daemon_watch::Daemon;
 use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
@@ -214,8 +214,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
             log::info!("{:?} election timer daemon done.", this.me);
         };
-        self.daemon_env
-            .watch_daemon(Daemon::ElectionTimer, election_daemon);
+        self.daemon_watch
+            .create_daemon(Daemon::ElectionTimer, election_daemon);
     }
 
     fn run_election(

+ 1 - 0
src/lib.rs

@@ -16,6 +16,7 @@ pub(crate) use crate::raft_state::State;
 mod apply_command;
 mod beat_ticker;
 mod daemon_env;
+mod daemon_watch;
 mod election;
 mod heartbeats;
 mod index_term;

+ 7 - 9
src/raft.rs

@@ -7,6 +7,7 @@ use serde_derive::{Deserialize, Serialize};
 
 use crate::apply_command::ApplyCommandFnMut;
 use crate::daemon_env::{DaemonEnv, ThreadEnv};
+use crate::daemon_watch::DaemonWatch;
 use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
@@ -43,6 +44,7 @@ pub struct Raft<Command> {
     pub(crate) heartbeats_daemon: HeartbeatsDaemon,
 
     pub(crate) thread_pool: utils::ThreadPoolHolder,
+    pub(crate) daemon_watch: DaemonWatch,
 
     pub(crate) daemon_env: DaemonEnv,
 }
@@ -100,6 +102,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .on_thread_stop(ThreadEnv::detach)
             .build()
             .expect("Creating thread pool should not fail");
+        let daemon_watch = DaemonWatch::create(daemon_env.for_thread());
         let peers = peers
             .into_iter()
             .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
@@ -120,6 +123,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
             heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: utils::ThreadPoolHolder::new(thread_pool),
+            daemon_watch,
             daemon_env,
         };
 
@@ -172,21 +176,15 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
     /// Cleanly shutdown this instance. This function never blocks forever. It
     /// either panics or returns eventually.
-    pub fn kill(mut self) {
+    pub fn kill(self) {
         self.keep_running.store(false, Ordering::Release);
         self.election.stop_election_timer();
         self.sync_log_entries_comms.kill();
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
         self.verify_authority_daemon.kill();
-        // We cannot easily combine stop_wait_group into DaemonEnv because of
-        // shutdown dependencies. The thread pool is not managed by DaemonEnv,
-        // but it cannot be shutdown until all daemons are. On the other hand
-        // the thread pool uses DaemonEnv, thus must be shutdown before
-        // DaemonEnv. The shutdown sequence is stop_wait_group -> thread_pool
-        // -> DaemonEnv. The first and third cannot be combined with the second
-        // in the middle.
-        self.daemon_env.wait_for_daemons();
+
+        self.daemon_watch.wait_for_daemons();
         self.thread_pool
             .take()
             .expect(

+ 4 - 3
src/snapshot.rs

@@ -5,7 +5,8 @@ use crossbeam_utils::sync::{Parker, Unparker};
 use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
-use crate::daemon_env::{Daemon, ErrorKind};
+use crate::daemon_env::ErrorKind;
+use crate::daemon_watch::Daemon;
 use crate::{Index, Raft};
 
 #[derive(Clone, Debug, Default)]
@@ -217,7 +218,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 );
             }
         };
-        self.daemon_env
-            .watch_daemon(Daemon::Snapshot, snapshot_daemon);
+        self.daemon_watch
+            .create_daemon(Daemon::Snapshot, snapshot_daemon);
     }
 }

+ 4 - 3
src/sync_log_entries.rs

@@ -3,7 +3,8 @@ use std::sync::Arc;
 
 use parking_lot::{Condvar, Mutex};
 
-use crate::daemon_env::{Daemon, ErrorKind};
+use crate::daemon_env::ErrorKind;
+use crate::daemon_watch::Daemon;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::peer_progress::PeerProgress;
 use crate::term_marker::TermMarker;
@@ -160,8 +161,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
             log::info!("{:?} sync log entries daemon done.", this.me);
         };
-        self.daemon_env
-            .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
+        self.daemon_watch
+            .create_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
     }
 
     /// Syncs log entries to a peer once, requests a new sync if that fails.

+ 3 - 3
src/verify_authority.rs

@@ -7,7 +7,7 @@ use std::time::{Duration, Instant};
 use parking_lot::{Condvar, Mutex};
 
 use crate::beat_ticker::{Beat, SharedBeatTicker};
-use crate::daemon_env::Daemon;
+use crate::daemon_watch::Daemon;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::{Index, Raft, Term};
 
@@ -351,8 +351,8 @@ impl<Command: 'static + Send> Raft<Command> {
             }
             log::info!("{:?} verify authority daemon done.", me);
         };
-        self.daemon_env
-            .watch_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
+        self.daemon_watch
+            .create_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
     }
 
     /// Create a verify authority request. Returns None if we are not the