소스 검색

Move apply_command.rs to using the standalone daemon_watch.

Jing Yang 3 년 전
부모
커밋
496ad0d7d7
2개의 변경된 파일9개의 추가작업 그리고 9개의 파일을 삭제
  1. 3 6
      src/apply_command.rs
  2. 6 3
      src/raft.rs

+ 3 - 6
src/apply_command.rs

@@ -1,6 +1,5 @@
 use std::sync::atomic::Ordering;
 
-use crate::daemon_watch::Daemon;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::{Index, Raft, ReplicableCommand, Snapshot};
 
@@ -49,13 +48,13 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub(crate) fn run_apply_command_daemon(
         &self,
         mut apply_command: impl ApplyCommandFnMut<Command>,
-    ) {
+    ) -> impl FnOnce() {
         let keep_running = self.keep_running.clone();
         let me = self.me;
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
-        let apply_command_daemon = move || {
+        move || {
             log::info!("{:?} apply command daemon running ...", me);
 
             while keep_running.load(Ordering::Relaxed) {
@@ -110,8 +109,6 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
             }
             log::info!("{:?} apply command daemon done.", me);
-        };
-        self.daemon_watch
-            .create_daemon(Daemon::ApplyCommand, apply_command_daemon);
+        }
     }
 }

+ 6 - 3
src/raft.rs

@@ -7,7 +7,7 @@ use serde_derive::{Deserialize, Serialize};
 
 use crate::apply_command::ApplyCommandFnMut;
 use crate::daemon_env::{DaemonEnv, ThreadEnv};
-use crate::daemon_watch::DaemonWatch;
+use crate::daemon_watch::{Daemon, DaemonWatch};
 use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
@@ -123,7 +123,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: thread_pool.handle().clone(),
             runtime: Arc::new(Mutex::new(Some(thread_pool))),
-            daemon_watch,
+            daemon_watch: daemon_watch.clone(),
             daemon_env,
         };
 
@@ -134,7 +134,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // Running in a standalone thread.
         this.run_log_entry_daemon(sync_log_entries_daemon);
         // Running in a standalone thread.
-        this.run_apply_command_daemon(apply_command);
+        daemon_watch.create_daemon(
+            Daemon::ApplyCommand,
+            this.run_apply_command_daemon(apply_command),
+        );
         // One off function that schedules many little tasks, running on the
         // internal thread pool.
         this.schedule_heartbeats(HEARTBEAT_INTERVAL);