Browse Source

Support running multiple instances of durio.

1. Specify different ports for them.
2. Run the web server at different ports.
3. Added an is_leader servlet.
Jing Yang 4 years ago
parent
commit
4c04d3d59b
2 changed files with 38 additions and 8 deletions
  1. 1 0
      durio/Cargo.toml
  2. 37 8
      durio/src/main.rs

+ 1 - 0
durio/Cargo.toml

@@ -17,6 +17,7 @@ async-trait = "0.1"
 bytes = "1.0"
 futures-util = "0.3.15"
 kvraft = { path = "../kvraft" }
+lazy_static = "1.4.0"
 parking_lot = "0.11.1"
 ruaft = { path = "..", features = ["integration-test"] }
 serde = "1.0"

+ 37 - 8
durio/src/main.rs

@@ -1,7 +1,9 @@
 use std::convert::Infallible;
+use std::net::SocketAddr;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
+use lazy_static::lazy_static;
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
 
@@ -18,12 +20,29 @@ struct PutAppendBody {
     value: String,
 }
 
-async fn run_web_server() {
-    let kv_addr = ([127, 0, 0, 1], 9988).into();
-    let raft_addr = ([127, 0, 0, 1], 10001).into();
-    let kv_server = run_kv_instance(kv_addr, vec![raft_addr], 0)
+lazy_static! {
+    static ref KV_ADDRS: Vec<SocketAddr> = vec![
+        ([127, 0, 0, 1], 9986).into(),
+        ([127, 0, 0, 1], 9987).into(),
+        ([127, 0, 0, 1], 9988).into(),
+    ];
+    static ref RAFT_ADDRS: Vec<SocketAddr> = vec![
+        ([127, 0, 0, 1], 10006).into(),
+        ([127, 0, 0, 1], 10007).into(),
+        ([127, 0, 0, 1], 10008).into(),
+    ];
+    static ref WEB_ADDRS: Vec<SocketAddr> = vec![
+        ([0, 0, 0, 0], 9006).into(),
+        ([0, 0, 0, 0], 9007).into(),
+        ([0, 0, 0, 0], 9008).into(),
+    ];
+}
+
+async fn run_web_server(me: usize) {
+    let kv_server = run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
         .await
         .expect("Running kv instance should not fail");
+    let is_leader_kv_server = kv_server.clone();
 
     let try_get = warp::path::param().and_then(move |_: u32| {
         let kv_server = kv_server.clone();
@@ -42,6 +61,8 @@ async fn run_web_server() {
             result
         }
     });
+    let is_leader = warp::path!("kvstore" / "is_leader")
+        .map(move || format!("{:?}", is_leader_kv_server.raft().get_state()));
 
     let counter = Arc::new(AtomicUsize::new(0));
     let counter_clone = counter.clone();
@@ -60,12 +81,20 @@ async fn run_web_server() {
         .and(warp::body::json())
         .map(|key, _body: PutAppendBody| warp::reply::json(&key));
 
-    let routes = warp::get().and(get.or(put).or(append).or(try_get));
-    warp::serve(routes).run(([0, 0, 0, 0], 9090)).await;
+    let routes =
+        warp::get().and(get.or(put).or(append).or(is_leader).or(try_get));
+    warp::serve(routes).run(WEB_ADDRS[me]).await;
 }
 
 fn main() {
-    test_utils::init_log("durio").expect("Initiating log should not fail");
+    let me: usize = std::env::args()
+        .skip(1)
+        .next()
+        .unwrap_or_default()
+        .parse()
+        .expect("An index of the current instance must be passed in");
+    test_utils::init_log(format!("durio-instance-{}", me).as_str())
+        .expect("Initiating log should not fail");
     let local_logger = test_utils::thread_local_logger::get();
 
     let thread_pool = tokio::runtime::Builder::new_multi_thread()
@@ -77,5 +106,5 @@ fn main() {
         .build()
         .expect("Creating thread pool should not fail");
 
-    thread_pool.block_on(run_web_server());
+    thread_pool.block_on(run_web_server(me));
 }