Browse Source

Move network to use tokio timer and threadpool.

It turns out tokio timer only works on tokio threadpools. Futures
seems to work seamlessly between a futures-rs threadpools on the
server side, and a tokio threadpool on the network side.
Jing Yang 5 năm trước cách đây
mục cha
commit
282c4dc6e5
3 tập tin đã thay đổi với 11 bổ sung9 xóa
  1. 2 2
      Cargo.toml
  2. 1 1
      src/lib.rs
  3. 8 6
      src/network.rs

+ 2 - 2
Cargo.toml

@@ -6,6 +6,6 @@ edition = "2018"
 
 [dependencies]
 bytes = "0.5.6"
-futures = { version = "0.3.5", features =[ "thread-pool" ] }
+futures = { version = "0.3.5", features = ["thread-pool"] }
 rand = "0.7.3"
-tokio-timer = "0.3.0-alpha.6"
+tokio = { version = "0.2.22", features = ["rt-threaded", "time"] }

+ 1 - 1
src/lib.rs

@@ -1,7 +1,7 @@
 extern crate bytes;
 extern crate futures;
 extern crate rand;
-extern crate tokio_timer;
+extern crate tokio;
 
 mod client;
 mod network;

+ 8 - 6
src/network.rs

@@ -129,7 +129,7 @@ impl Network {
     const SHUTDOWN_DELAY: Duration = Duration::from_micros(20);
 
     async fn delay_for_millis(milli_seconds: u64) {
-        tokio_timer::delay_for(Duration::from_millis(milli_seconds)).await;
+        tokio::time::delay_for(Duration::from_millis(milli_seconds)).await;
     }
 
     async fn serve_rpc(network: Arc<Mutex<Self>>, rpc: RpcOnWire) {
@@ -231,10 +231,12 @@ impl Network {
 
         let network = Arc::new(Mutex::new(network));
 
-        let thread_pool = futures::executor::ThreadPool::builder()
-            .pool_size(20)
-            .name_prefix("network")
-            .create()
+        let thread_pool = tokio::runtime::Builder::new()
+            .core_threads(10)
+            .max_threads(20)
+            .thread_name("network")
+            .enable_time()
+            .build()
             .expect("Creating network thread pool should not fail");
 
         let other = network.clone();
@@ -260,7 +262,7 @@ impl Network {
                 match rx.try_recv() {
                     Ok(rpc) => {
                         thread_pool
-                            .spawn_ok(Self::serve_rpc(network.clone(), rpc));
+                            .spawn(Self::serve_rpc(network.clone(), rpc));
                     }
                     // All senders have disconnected. This should never happen,
                     // since the network instance itself holds a sender.