Bläddra i källkod

Replace Arc<Runtime> with a custom thread pool holder.

This change replaces an indirect ownership (Arc<Runtime>) with a direct
ownership (ThreadPoolHolder owns a Handler), and thus improves efficiency.

A Runtime and a Handle are essentially the same thing under the hood. The
difference is that the Runtime conceptually represent the thread pool. Dropping
the runtime implies shutting it down. Thus the Runtime cannot be cloned.

What we need in Raft is a clonable handle, not a Runtime. However we still need
the runtime for shutdown. Thus the runtime is submit to a global runtime jail.
We only retrieve the runtime during shutdown, when we are sure all references
have been dropped.
Jing Yang 3 år sedan
förälder
incheckning
28082ff335
3 ändrade filer med 43 tillägg och 3 borttagningar
  1. 1 0
      Cargo.toml
  2. 4 3
      src/raft.rs
  3. 38 0
      src/utils.rs

+ 1 - 0
Cargo.toml

@@ -19,6 +19,7 @@ bytes = "1.1"
 crossbeam-utils = "0.8"
 futures-channel = "0.3.21"
 futures-util = "0.3.21"
+lazy_static = "1.4.0"
 log = "0.4"
 parking_lot = "0.12"
 rand = "0.8"

+ 4 - 3
src/raft.rs

@@ -41,7 +41,7 @@ pub struct Raft<Command> {
     pub(crate) verify_authority_daemon: VerifyAuthorityDaemon,
     pub(crate) heartbeats_daemon: HeartbeatsDaemon,
 
-    pub(crate) thread_pool: Arc<tokio::runtime::Runtime>,
+    pub(crate) thread_pool: utils::ThreadPoolHolder,
 
     pub(crate) daemon_env: DaemonEnv,
     pub(crate) stop_wait_group: WaitGroup,
@@ -117,7 +117,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             snapshot_daemon: SnapshotDaemon::create(),
             verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
             heartbeats_daemon: HeartbeatsDaemon::create(),
-            thread_pool: Arc::new(thread_pool),
+            thread_pool: utils::ThreadPoolHolder::new(thread_pool),
             daemon_env,
             stop_wait_group: WaitGroup::new(),
         };
@@ -184,7 +184,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // -> DaemonEnv. The first and third cannot be combined with the second
         // in the middle.
         self.stop_wait_group.wait();
-        std::sync::Arc::try_unwrap(self.thread_pool)
+        self.thread_pool
+            .take()
             .expect(
                 "All references to the thread pool should have been dropped.",
             )

+ 38 - 0
src/utils.rs

@@ -129,3 +129,41 @@ impl<T> From<SharedSender<T>> for std::sync::mpsc::Sender<T> {
         this.0
     }
 }
+
+lazy_static::lazy_static! {
+    static ref THREAD_POOLS: parking_lot::Mutex<std::collections::HashMap<u64, tokio::runtime::Runtime>> =
+        parking_lot::Mutex::new(std::collections::HashMap::new());
+}
+
+#[derive(Clone)]
+pub(crate) struct ThreadPoolHolder {
+    id: u64,
+    handle: tokio::runtime::Handle,
+}
+
+impl ThreadPoolHolder {
+    pub fn new(runtime: tokio::runtime::Runtime) -> Self {
+        let handle = runtime.handle().clone();
+        loop {
+            let id: u64 = rand::random();
+            if let std::collections::hash_map::Entry::Vacant(v) =
+                THREAD_POOLS.lock().entry(id)
+            {
+                v.insert(runtime);
+                break Self { id, handle };
+            }
+        }
+    }
+
+    pub fn take(self) -> Option<tokio::runtime::Runtime> {
+        THREAD_POOLS.lock().remove(&self.id)
+    }
+}
+
+impl std::ops::Deref for ThreadPoolHolder {
+    type Target = tokio::runtime::Handle;
+
+    fn deref(&self) -> &Self::Target {
+        &self.handle
+    }
+}