Quellcode durchsuchen

Run RPC servers in a different poll.

Jing Yang vor 4 Jahren
Ursprung
Commit
c843a8457a
1 geänderte Dateien mit 28 neuen und 8 gelöschten Zeilen
  1. 28 8
      durio/src/main.rs

+ 28 - 8
durio/src/main.rs

@@ -2,6 +2,7 @@ use std::net::SocketAddr;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
+use kvraft::KVServer;
 use lazy_static::lazy_static;
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
@@ -46,11 +47,7 @@ lazy_static! {
 
 const NOT_READY: &str = "Clerk is not ready";
 
-async fn run_web_server(me: usize) {
-    let kv_server = run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
-        .await
-        .expect("Running kv instance should not fail");
-
+async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
     let is_leader = warp::get()
         .and(warp::path!("kvstore" / "is_leader"))
         .map(move || format!("{:?}", kv_server.raft().get_state()));
@@ -99,7 +96,7 @@ async fn run_web_server(me: usize) {
         });
 
     let routes = is_leader.or(get).or(put).or(append);
-    warp::serve(routes).run(WEB_ADDRS[me]).await;
+    warp::serve(routes).run(socket_addr).await;
 }
 
 fn main() {
@@ -111,16 +108,39 @@ fn main() {
         .expect("An index of the current instance must be passed in");
     test_utils::init_log(format!("durio-instance-{}", me).as_str())
         .expect("Initiating log should not fail");
+
+    // Run RPC servers in a thread pool. This pool
+    // 1. Accepts incoming RPC connections for KV and Raft servers.
+    // 2. Sends out RPCs to other Raft instances.
+    // Timers are used by RPC handling code in the KV server.
     let local_logger = test_utils::thread_local_logger::get();
+    let rpc_server_thread_pool = tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .thread_name("durio-rpc")
+        .on_thread_start(move || {
+            test_utils::thread_local_logger::set(local_logger.clone())
+        })
+        .build()
+        .expect("Creating thread pool should not fail");
 
+    let kv_server = rpc_server_thread_pool.block_on(async {
+        run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
+            .await
+            .expect("Running kv instance should not fail")
+    });
+
+    // Run web servers in a thread pool. This pool
+    // 1. Accepts incoming HTTP connections.
+    // 2. Sends out RPCs to KV instances, both local and remote.
+    let local_logger = test_utils::thread_local_logger::get();
     let thread_pool = tokio::runtime::Builder::new_multi_thread()
         .enable_all()
-        .thread_name("durio")
+        .thread_name("durio-web")
         .on_thread_start(move || {
             test_utils::thread_local_logger::set(local_logger.clone())
         })
         .build()
         .expect("Creating thread pool should not fail");
 
-    thread_pool.block_on(run_web_server(me));
+    thread_pool.block_on(run_web_server(WEB_ADDRS[me], kv_server));
 }