소스 검색

Construct a join handle upfront.

Jing Yang 3 년 전
부모
커밋
a959f7a175
1개의 변경된 파일22개의 추가작업 그리고 13개의 파일을 삭제
  1. 22 13
      src/raft.rs

+ 22 - 13
src/raft.rs

@@ -1,3 +1,4 @@
+use crossbeam_utils::sync::WaitGroup;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::sync::Arc;
 use std::time::Duration;
 use std::time::Duration;
@@ -42,10 +43,10 @@ pub struct Raft<Command> {
     pub(crate) heartbeats_daemon: HeartbeatsDaemon,
     pub(crate) heartbeats_daemon: HeartbeatsDaemon,
 
 
     pub(crate) thread_pool: tokio::runtime::Handle,
     pub(crate) thread_pool: tokio::runtime::Handle,
-
-    pub(crate) runtime: Arc<Mutex<Option<tokio::runtime::Runtime>>>,
-    pub(crate) daemon_watch: DaemonWatch,
     pub(crate) daemon_env: DaemonEnv,
     pub(crate) daemon_env: DaemonEnv,
+
+    stop_wait_group: WaitGroup,
+    join_handle: Arc<Mutex<Option<RaftJoinHandle>>>,
 }
 }
 
 
 impl<Command: ReplicableCommand> Raft<Command> {
 impl<Command: ReplicableCommand> Raft<Command> {
@@ -122,9 +123,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
             verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
             heartbeats_daemon: HeartbeatsDaemon::create(),
             heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: thread_pool.handle().clone(),
             thread_pool: thread_pool.handle().clone(),
-            runtime: Arc::new(Mutex::new(Some(thread_pool))),
-            daemon_watch: daemon_watch.clone(),
-            daemon_env,
+            stop_wait_group: WaitGroup::new(),
+            daemon_env: daemon_env.clone(),
+            // The join handle will be created later.
+            join_handle: Arc::new(Mutex::new(None)),
         };
         };
 
 
         // Running in a standalone thread.
         // Running in a standalone thread.
@@ -153,6 +155,15 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // The last step is to start running election timer.
         // The last step is to start running election timer.
         daemon_watch
         daemon_watch
             .create_daemon(Daemon::ElectionTimer, this.run_election_timer());
             .create_daemon(Daemon::ElectionTimer, this.run_election_timer());
+
+        // Create the join handle
+        this.join_handle.lock().replace(RaftJoinHandle {
+            stop_wait_group: this.stop_wait_group.clone(),
+            thread_pool,
+            daemon_watch,
+            daemon_env,
+        });
+
         this
         this
     }
     }
 }
 }
@@ -194,12 +205,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         self.snapshot_daemon.kill();
         self.snapshot_daemon.kill();
         self.verify_authority_daemon.kill();
         self.verify_authority_daemon.kill();
 
 
-        let runtime = self.runtime.lock().take().unwrap();
-        RaftJoinHandle {
-            runtime,
-            daemon_watch: self.daemon_watch,
-            daemon_env: self.daemon_env,
-        }
+        self.join_handle.lock().take().unwrap()
     }
     }
 
 
     /// Returns the current term and whether we are the leader.
     /// Returns the current term and whether we are the leader.
@@ -223,6 +229,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 /// commits will be created by this Raft instance.
 /// commits will be created by this Raft instance.
 #[must_use]
 #[must_use]
 pub struct RaftJoinHandle {
 pub struct RaftJoinHandle {
+    stop_wait_group: WaitGroup,
     thread_pool: tokio::runtime::Runtime,
     thread_pool: tokio::runtime::Runtime,
     daemon_watch: DaemonWatch,
     daemon_watch: DaemonWatch,
     daemon_env: DaemonEnv,
     daemon_env: DaemonEnv,
@@ -233,8 +240,10 @@ impl RaftJoinHandle {
         Duration::from_millis(HEARTBEAT_INTERVAL.as_millis() as u64 * 2);
         Duration::from_millis(HEARTBEAT_INTERVAL.as_millis() as u64 * 2);
 
 
     pub fn join(self) {
     pub fn join(self) {
+        // Wait for all Raft instances to be dropped.
+        self.stop_wait_group.wait();
         self.daemon_watch.wait_for_daemons();
         self.daemon_watch.wait_for_daemons();
-        self.runtime.shutdown_timeout(Self::SHUTDOWN_TIMEOUT);
+        self.thread_pool.shutdown_timeout(Self::SHUTDOWN_TIMEOUT);
         // DaemonEnv must be shutdown after the thread pool, since there might
         // DaemonEnv must be shutdown after the thread pool, since there might
         // be tasks logging errors in the pool.
         // be tasks logging errors in the pool.
         self.daemon_env.shutdown();
         self.daemon_env.shutdown();