Просмотр исходного кода

Merge stop_wait_group into daemon env.

Jing Yang 3 лет назад
Родитель
Сommit
6d22246e50
7 измененных файлов с 23 добавлено и 23 удалено
  1. 0 3
      src/apply_command.rs
  2. 22 1
      src/daemon_env.rs
  3. 0 5
      src/election.rs
  4. 1 4
      src/raft.rs
  5. 0 2
      src/snapshot.rs
  6. 0 5
      src/sync_log_entries.rs
  7. 0 3
      src/verify_authority.rs

+ 0 - 3
src/apply_command.rs

@@ -55,7 +55,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let rf = self.inner_state.clone();
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
         let condvar = self.apply_command_signal.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
-        let stop_wait_group = self.stop_wait_group.clone();
         let apply_command_daemon = move || {
         let apply_command_daemon = move || {
             log::info!("{:?} apply command daemon running ...", me);
             log::info!("{:?} apply command daemon running ...", me);
 
 
@@ -111,8 +110,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
                 }
             }
             }
             log::info!("{:?} apply command daemon done.", me);
             log::info!("{:?} apply command daemon done.", me);
-
-            drop(stop_wait_group);
         };
         };
         self.daemon_env
         self.daemon_env
             .watch_daemon(Daemon::ApplyCommand, apply_command_daemon);
             .watch_daemon(Daemon::ApplyCommand, apply_command_daemon);

+ 22 - 1
src/daemon_env.rs

@@ -1,6 +1,7 @@
 use std::cell::RefCell;
 use std::cell::RefCell;
 use std::sync::{Arc, Weak};
 use std::sync::{Arc, Weak};
 
 
+use crossbeam_utils::sync::WaitGroup;
 use parking_lot::Mutex;
 use parking_lot::Mutex;
 
 
 #[cfg(all(not(test), feature = "integration-test"))]
 #[cfg(all(not(test), feature = "integration-test"))]
@@ -33,6 +34,7 @@ macro_rules! check_or_record {
 pub(crate) struct DaemonEnv {
 pub(crate) struct DaemonEnv {
     data: Arc<Mutex<DaemonEnvData>>,
     data: Arc<Mutex<DaemonEnvData>>,
     thread_env: ThreadEnv,
     thread_env: ThreadEnv,
+    stop_wait_group: Option<WaitGroup>,
 }
 }
 
 
 #[derive(Debug)]
 #[derive(Debug)]
@@ -126,17 +128,30 @@ impl DaemonEnv {
         T: Send + 'static,
         T: Send + 'static,
     {
     {
         let thread_env = self.for_thread();
         let thread_env = self.for_thread();
+        let stop_wait_group = self
+            .stop_wait_group
+            .clone()
+            .expect("Expecting a valid stop wait group when creating daemons");
         let thread = std::thread::Builder::new()
         let thread = std::thread::Builder::new()
             .name(format!("ruaft-daemon-{:?}", daemon))
             .name(format!("ruaft-daemon-{:?}", daemon))
             .spawn(move || {
             .spawn(move || {
                 thread_env.attach();
                 thread_env.attach();
                 func();
                 func();
                 ThreadEnv::detach();
                 ThreadEnv::detach();
+                drop(stop_wait_group);
             })
             })
             .expect("Creating daemon thread should never fail");
             .expect("Creating daemon thread should never fail");
         self.data.lock().daemons.push((daemon, thread));
         self.data.lock().daemons.push((daemon, thread));
     }
     }
 
 
+    pub fn wait_for_daemons(&mut self) {
+        if let Some(stop_wait_group) = self.stop_wait_group.take() {
+            stop_wait_group.wait();
+        } else {
+            panic!("Daemons can only be waited once")
+        }
+    }
+
     /// Makes sure that all daemons have been shutdown, no more errors can be
     /// Makes sure that all daemons have been shutdown, no more errors can be
     /// added, checks if any error has been added, or if any daemon panicked.
     /// added, checks if any error has been added, or if any daemon panicked.
     pub fn shutdown(self) {
     pub fn shutdown(self) {
@@ -224,7 +239,12 @@ impl DaemonEnv {
             #[cfg(all(not(test), feature = "integration-test"))]
             #[cfg(all(not(test), feature = "integration-test"))]
             local_logger: thread_local_logger::get(),
             local_logger: thread_local_logger::get(),
         };
         };
-        Self { data, thread_env }
+
+        Self {
+            data,
+            thread_env,
+            stop_wait_group: Some(WaitGroup::new()),
+        }
     }
     }
 
 
     /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
     /// Creates a [`ThreadEnv`] that could be attached to a thread. Any code
@@ -278,6 +298,7 @@ impl ThreadEnv {
         DaemonEnv {
         DaemonEnv {
             data: env.data.upgrade().unwrap(),
             data: env.data.upgrade().unwrap(),
             thread_env: env,
             thread_env: env,
+            stop_wait_group: None,
         }
         }
     }
     }
 
 

+ 0 - 5
src/election.rs

@@ -204,11 +204,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
             }
             }
 
 
             log::info!("{:?} election timer daemon done.", this.me);
             log::info!("{:?} election timer daemon done.", this.me);
-
-            let stop_wait_group = this.stop_wait_group.clone();
-            // Making sure the rest of `this` is dropped before the wait group.
-            drop(this);
-            drop(stop_wait_group);
         };
         };
         self.daemon_env
         self.daemon_env
             .watch_daemon(Daemon::ElectionTimer, election_daemon);
             .watch_daemon(Daemon::ElectionTimer, election_daemon);

+ 1 - 4
src/raft.rs

@@ -1,7 +1,6 @@
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::sync::Arc;
 
 
-use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use parking_lot::{Condvar, Mutex};
 use serde_derive::{Deserialize, Serialize};
 use serde_derive::{Deserialize, Serialize};
 
 
@@ -44,7 +43,6 @@ pub struct Raft<Command> {
     pub(crate) thread_pool: utils::ThreadPoolHolder,
     pub(crate) thread_pool: utils::ThreadPoolHolder,
 
 
     pub(crate) daemon_env: DaemonEnv,
     pub(crate) daemon_env: DaemonEnv,
-    pub(crate) stop_wait_group: WaitGroup,
 }
 }
 
 
 impl<Command: ReplicableCommand> Raft<Command> {
 impl<Command: ReplicableCommand> Raft<Command> {
@@ -119,7 +117,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
             heartbeats_daemon: HeartbeatsDaemon::create(),
             heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: utils::ThreadPoolHolder::new(thread_pool),
             thread_pool: utils::ThreadPoolHolder::new(thread_pool),
             daemon_env,
             daemon_env,
-            stop_wait_group: WaitGroup::new(),
         };
         };
 
 
         // Running in a standalone thread.
         // Running in a standalone thread.
@@ -183,7 +180,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // DaemonEnv. The shutdown sequence is stop_wait_group -> thread_pool
         // DaemonEnv. The shutdown sequence is stop_wait_group -> thread_pool
         // -> DaemonEnv. The first and third cannot be combined with the second
         // -> DaemonEnv. The first and third cannot be combined with the second
         // in the middle.
         // in the middle.
-        self.stop_wait_group.wait();
+        self.daemon_env.wait_for_daemons();
         self.thread_pool
         self.thread_pool
             .take()
             .take()
             .expect(
             .expect(

+ 0 - 2
src/snapshot.rs

@@ -142,7 +142,6 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         let rf = self.inner_state.clone();
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
         let persister = self.persister.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
-        let stop_wait_group = self.stop_wait_group.clone();
 
 
         log::info!("{:?} snapshot daemon running ...", me);
         log::info!("{:?} snapshot daemon running ...", me);
         let snapshot_daemon = move || loop {
         let snapshot_daemon = move || loop {
@@ -155,7 +154,6 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 drop(rf);
                 drop(rf);
                 drop(persister);
                 drop(persister);
                 drop(snapshot_daemon);
                 drop(snapshot_daemon);
-                drop(stop_wait_group);
                 break;
                 break;
             }
             }
             if persister.state_size() >= max_state_size {
             if persister.state_size() >= max_state_size {

+ 0 - 5
src/sync_log_entries.rs

@@ -102,11 +102,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
             }
             }
 
 
             log::info!("{:?} sync log entries daemon done.", this.me);
             log::info!("{:?} sync log entries daemon done.", this.me);
-
-            let stop_wait_group = this.stop_wait_group.clone();
-            // Making sure the rest of `this` is dropped before the wait group.
-            drop(this);
-            drop(stop_wait_group);
         };
         };
         self.daemon_env
         self.daemon_env
             .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
             .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);

+ 0 - 3
src/verify_authority.rs

@@ -377,7 +377,6 @@ impl<Command: 'static + Send> Raft<Command> {
         let keep_running = self.keep_running.clone();
         let keep_running = self.keep_running.clone();
         let this_daemon = self.verify_authority_daemon.clone();
         let this_daemon = self.verify_authority_daemon.clone();
         let rf = self.inner_state.clone();
         let rf = self.inner_state.clone();
-        let stop_wait_group = self.stop_wait_group.clone();
 
 
         let verify_authority_daemon = move || {
         let verify_authority_daemon = move || {
             log::info!("{:?} verify authority daemon running ...", me);
             log::info!("{:?} verify authority daemon running ...", me);
@@ -394,8 +393,6 @@ impl<Command: 'static + Send> Raft<Command> {
                 );
                 );
             }
             }
             log::info!("{:?} verify authority daemon done.", me);
             log::info!("{:?} verify authority daemon done.", me);
-
-            drop(stop_wait_group);
         };
         };
         self.daemon_env
         self.daemon_env
             .watch_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
             .watch_daemon(Daemon::VerifyAuthority, verify_authority_daemon);