Browse Source

Polishing the threading model based on experiments.

1. Use futures::executor::block_on() in tests.
2. Use spawn() instead of spawn_blocking().
3. Add more threads to mitigate one server blocking everyone.
Jing Yang 4 năm trước cách đây
mục cha
commit
838ba65f32
2 tập tin đã thay đổi với 14 bổ sung2 xóa
  1. 6 0
      src/network.rs
  2. 8 2
      src/server.rs

+ 6 - 0
src/network.rs

@@ -267,6 +267,8 @@ impl Network {
         mark_trace!(rpc.trace, served);
     }
 
+    const WORKER_THREADS: usize = 16;
+
     pub fn run_daemon() -> Arc<Mutex<Network>> {
         let (network, rx) = Network::new();
 
@@ -275,6 +277,7 @@ impl Network {
 
         // Using tokio instead of futures-rs, because we need timer futures.
         let thread_pool = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(Self::WORKER_THREADS)
             .thread_name("network")
             .enable_time()
             .build()
@@ -665,6 +668,9 @@ mod tests {
                     results.push(reply);
                 }
                 for result in results {
+                    // Futures executor is used instead of tokio executor,
+                    // because it uses std::thread::park(), which is more
+                    // straightforward and faster than mutex + condvar.
                     futures::executor::block_on(result)
                         .expect("All futures should succeed");
                 }

+ 8 - 2
src/server.rs

@@ -39,7 +39,7 @@ impl Server {
         mark_trace!(trace, before_server_scheduling);
         #[cfg(feature = "tracing")]
         let trace_clone = trace.clone();
-        let result = this.thread_pool.as_ref().unwrap().spawn(async move {
+        let runner = move || {
             let rpc_handler = {
                 // Blocking on a mutex in a thread pool. Sounds horrible, but
                 // in fact quite safe, given that the critical section is short.
@@ -66,7 +66,13 @@ impl Server {
                 Ok(Err(e)) => resume_unwind(e),
                 Err(e) => Err(e),
             };
-        });
+        };
+        let thread_pool = this.thread_pool.as_ref().unwrap();
+        // Using spawn() instead of spawn_blocking(), because the spawn() is
+        // better at handling a large number of small workloads. Running
+        // blocking code on async runner is fine, since all of the tasks we run
+        // on this pool are blocking (for a limited time).
+        let result = thread_pool.spawn(async { runner() });
         mark_trace!(trace, after_server_scheduling);
         let result = tokio::select! {
             result = result => Some(result),