Kaynağa Gözat

Add daemon starting and exiting log.

Jing Yang 4 yıl önce
ebeveyn
işleme
85d5da0864
4 değiştirilmiş dosya ile 14 ekleme ve 0 silme
  1. 3 0
      src/apply_command.rs
  2. 3 0
      src/election.rs
  3. 4 0
      src/snapshot.rs
  4. 4 0
      src/sync_log_entries.rs

+ 3 - 0
src/apply_command.rs

@@ -53,6 +53,7 @@ where
         mut apply_command: impl ApplyCommandFnMut<Command>,
     ) {
         let keep_running = self.keep_running.clone();
+        let me = self.me;
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
@@ -61,6 +62,7 @@ where
         let join_handle = std::thread::spawn(move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = daemon_env.for_scope();
+            log::info!("{:?} apply command daemon running ...", me);
 
             while keep_running.load(Ordering::SeqCst) {
                 let messages = {
@@ -112,6 +114,7 @@ where
                     snapshot_daemon.trigger();
                 }
             }
+            log::info!("{:?} apply command daemon done.", me);
 
             drop(stop_wait_group);
         });

+ 3 - 0
src/election.rs

@@ -123,6 +123,7 @@ where
         let join_handle = std::thread::spawn(move || {
             // Note: do not change this to `let _ = ...`.
             let _guard = this.daemon_env.for_scope();
+            log::info!("{:?} election timer daemon running ...", this.me);
 
             let election = this.election.clone();
 
@@ -201,6 +202,8 @@ where
                 cancel_handle.map(|c| c.send(()));
             }
 
+            log::info!("{:?} election timer daemon done.", this.me);
+
             let stop_wait_group = this.stop_wait_group.clone();
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);

+ 4 - 0
src/snapshot.rs

@@ -112,18 +112,22 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
         self.snapshot_daemon.unparker.replace(unparker.clone());
 
         let keep_running = self.keep_running.clone();
+        let me = self.me;
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
         let daemon_env = self.daemon_env.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
+        log::info!("{:?} snapshot daemon running ...", me);
         let join_handle = std::thread::spawn(move || loop {
             // Note: do not change this to `let _ = ...`.
             let _guard = daemon_env.for_scope();
 
             parker.park();
             if !keep_running.load(Ordering::SeqCst) {
+                log::info!("{:?} snapshot daemon done.", me);
+
                 // Explicitly drop every thing.
                 drop(keep_running);
                 drop(rf);

+ 4 - 0
src/sync_log_entries.rs

@@ -66,6 +66,8 @@ where
             // Note: do not change this to `let _ = ...`.
             let _guard = this.daemon_env.for_scope();
 
+            log::info!("{:?} sync log entries daemon running ...", this.me);
+
             let mut openings = vec![];
             openings.resize_with(this.peers.len(), || {
                 Opening(Arc::new(AtomicUsize::new(0)))
@@ -99,6 +101,8 @@ where
                 }
             }
 
+            log::info!("{:?} sync log entries daemon done.", this.me);
+
             let stop_wait_group = this.stop_wait_group.clone();
             // Making sure the rest of `this` is dropped before the wait group.
             drop(this);