|
|
@@ -301,11 +301,18 @@ impl<Command: 'static + Send> Raft<Command> {
|
|
|
let unparker = parker.unparker().clone();
|
|
|
self.verify_authority_daemon.unparker.replace(unparker);
|
|
|
|
|
|
+ let me = self.me.clone();
|
|
|
let keep_running = self.keep_running.clone();
|
|
|
+ let daemon_env = self.daemon_env.clone();
|
|
|
let this_daemon = self.verify_authority_daemon.clone();
|
|
|
let rf = self.inner_state.clone();
|
|
|
+ let stop_wait_group = self.stop_wait_group.clone();
|
|
|
|
|
|
let join_handle = std::thread::spawn(move || {
|
|
|
+ // Note: do not change this to `let _ = ...`.
|
|
|
+ let _guard = daemon_env.for_scope();
|
|
|
+
|
|
|
+ log::info!("{:?} verify authority daemon running ...", me);
|
|
|
while keep_running.load(Ordering::Acquire) {
|
|
|
parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
|
|
|
let (current_term, commit_index, sentinel) = {
|
|
|
@@ -318,6 +325,9 @@ impl<Command: 'static + Send> Raft<Command> {
|
|
|
sentinel,
|
|
|
);
|
|
|
}
|
|
|
+ log::info!("{:?} verify authority daemon done.", me);
|
|
|
+
|
|
|
+ drop(stop_wait_group);
|
|
|
});
|
|
|
self.daemon_env
|
|
|
.watch_daemon(Daemon::VerifyAuthority, join_handle);
|