فهرست منبع

Merge daemon_env guard into daemons.

Jing Yang 3 سال پیش
والد
کامیت
f230c518c0
6فایلهای تغییر یافته به همراه3 افزوده شده و 17 حذف شده
  1. 0 3
      src/apply_command.rs
  2. 3 0
      src/daemon_env.rs
  3. 0 2
      src/election.rs
  4. 0 5
      src/snapshot.rs
  5. 0 3
      src/sync_log_entries.rs
  6. 0 4
      src/verify_authority.rs

+ 0 - 3
src/apply_command.rs

@@ -55,11 +55,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
-        let daemon_env = self.daemon_env.clone();
         let stop_wait_group = self.stop_wait_group.clone();
         let apply_command_daemon = move || {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = daemon_env.for_scope();
             log::info!("{:?} apply command daemon running ...", me);
 
             while keep_running.load(Ordering::SeqCst) {

+ 3 - 0
src/daemon_env.rs

@@ -125,10 +125,13 @@ impl DaemonEnv {
         F: Send + 'static,
         T: Send + 'static,
     {
+        let thread_env = self.for_thread();
         let thread = std::thread::Builder::new()
             .name(format!("ruaft-daemon-{:?}", daemon))
             .spawn(move || {
+                thread_env.attach();
                 func();
+                ThreadEnv::detach();
             })
             .expect("Creating daemon thread should never fail");
         self.data.lock().daemons.push((daemon, thread));

+ 0 - 2
src/election.rs

@@ -122,8 +122,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let this = self.clone();
 
         let election_daemon = move || {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = this.daemon_env.for_scope();
             log::info!("{:?} election timer daemon running ...", this.me);
 
             let election = this.election.clone();

+ 0 - 5
src/snapshot.rs

@@ -142,14 +142,10 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
-        let daemon_env = self.daemon_env.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
         log::info!("{:?} snapshot daemon running ...", me);
         let snapshot_daemon = move || loop {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = daemon_env.for_scope();
-
             parker.park();
             if !keep_running.load(Ordering::SeqCst) {
                 log::info!("{:?} snapshot daemon done.", me);
@@ -159,7 +155,6 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 drop(rf);
                 drop(persister);
                 drop(snapshot_daemon);
-                drop(daemon_env);
                 drop(stop_wait_group);
                 break;
             }

+ 0 - 3
src/sync_log_entries.rs

@@ -62,9 +62,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // Clone everything that the thread needs.
         let this = self.clone();
         let sync_log_entry_daemon = move || {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = this.daemon_env.for_scope();
-
             log::info!("{:?} sync log entries daemon running ...", this.me);
 
             let mut openings = vec![];

+ 0 - 4
src/verify_authority.rs

@@ -375,15 +375,11 @@ impl<Command: 'static + Send> Raft<Command> {
     pub(crate) fn run_verify_authority_daemon(&self) {
         let me = self.me;
         let keep_running = self.keep_running.clone();
-        let daemon_env = self.daemon_env.clone();
         let this_daemon = self.verify_authority_daemon.clone();
         let rf = self.inner_state.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
         let verify_authority_daemon = move || {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = daemon_env.for_scope();
-
             log::info!("{:?} verify authority daemon running ...", me);
             while keep_running.load(Ordering::Acquire) {
                 this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);