Jing Yang 3 лет назад
Родитель
Сommit
23f974ecd6
3 измененных файлов с 117 добавлено и 7 удалено
  1. 2 0
      durio/Cargo.toml
  2. 2 0
      durio/run_smoke_test.sh
  3. 113 7
      durio/src/main.rs

+ 2 - 0
durio/Cargo.toml

@@ -32,3 +32,5 @@ tokio = { version = "1.7", features = ["macros", "rt-multi-thread", "time", "par
 tokio-serde = { version = "0.8", features = ["json"] }
 
 [dev-dependencies]
+reqwest = { version = "0.11", features = ["json"] }
+env_logger = "0.9.0"

+ 2 - 0
durio/run_smoke_test.sh

@@ -0,0 +1,2 @@
+#!/bin/bash
+RUST_LOG=ruaft=debug,kvraft=debug cargo test -- --nocapture

+ 113 - 7
durio/src/main.rs

@@ -46,10 +46,14 @@ lazy_static! {
     ];
 }
 
-async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
+async fn run_web_server(
+    socket_addr: SocketAddr,
+    kv_server: Arc<KVServer>,
+    kv_addrs: Vec<SocketAddr>,
+) {
     let app = axum::Router::new();
     let app = app.route(
-        "kvstore/is_leader",
+        "/kvstore/is_leader",
         axum::routing::get(move || {
             let kv_server = kv_server.clone();
             async move {
@@ -60,12 +64,12 @@ async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
         }),
     );
 
-    let clerk = Arc::new(create_async_clerk(KV_ADDRS.clone()).await);
+    let clerk = Arc::new(create_async_clerk(kv_addrs).await);
     let counter = Arc::new(AtomicUsize::new(0));
 
     let get_clerk = clerk.clone();
     let app = app.route(
-        "kvstore/get/:key",
+        "/kvstore/get/:key",
         axum::routing::get(move |Path(key): Path<String>| {
             let counter = counter.fetch_add(1, Ordering::Relaxed).to_string();
             let get_clerk = get_clerk.clone();
@@ -80,7 +84,7 @@ async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
 
     let put_clerk = clerk.clone();
     let app = app.route(
-        "kvstore/put",
+        "/kvstore/put",
         axum::routing::post(move |Json(body): Json<PutAppendBody>| {
             let put_clerk = put_clerk.clone();
             async move {
@@ -93,7 +97,7 @@ async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
 
     let append_clerk = clerk.clone();
     let app = app.route(
-        "kvstore/append",
+        "/kvstore/append",
         axum::routing::post(move |Json(body): Json<PutAppendBody>| {
             let append_clerk = append_clerk.clone();
             async move {
@@ -150,5 +154,107 @@ fn main() {
         .build()
         .expect("Creating thread pool should not fail");
 
-    thread_pool.block_on(run_web_server(WEB_ADDRS[me], kv_server));
+    thread_pool.block_on(run_web_server(
+        WEB_ADDRS[me],
+        kv_server,
+        KV_ADDRS.clone(),
+    ));
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::time::Duration;
+
+    #[tokio::test]
+    async fn smoke_test() {
+        let kv_addrs: Vec<SocketAddr> = vec![
+            ([0, 0, 0, 0], 9986).into(),
+            ([0, 0, 0, 0], 9987).into(),
+            ([0, 0, 0, 0], 9988).into(),
+        ];
+        let raft_addrs: Vec<SocketAddr> = vec![
+            ([0, 0, 0, 0], 10006).into(),
+            ([0, 0, 0, 0], 10007).into(),
+            ([0, 0, 0, 0], 10008).into(),
+        ];
+        let web_addrs: Vec<SocketAddr> = vec![
+            ([0, 0, 0, 0], 9006).into(),
+            ([0, 0, 0, 0], 9007).into(),
+            ([0, 0, 0, 0], 9008).into(),
+        ];
+        env_logger::init();
+
+        // KV servers must be created before the web frontend can be run.
+        let mut kv_servers = vec![];
+        for me in 0..3 {
+            let kv_server =
+                run_kv_instance(kv_addrs[me], raft_addrs.clone(), me)
+                    .await
+                    .expect("Running kv instance should not fail");
+            kv_servers.push(kv_server);
+        }
+
+        // All servers at `kv_addrs` must be up.
+        for (me, kv_server) in kv_servers.into_iter().enumerate() {
+            tokio::spawn(run_web_server(
+                web_addrs[me],
+                kv_server,
+                kv_addrs.clone(),
+            ));
+        }
+        tokio::time::sleep(Duration::from_millis(500)).await;
+
+        let mut leader_count = 0;
+        for i in 0..3 {
+            let url = format!(
+                "http://localhost:{}/kvstore/is_leader",
+                web_addrs[i].port()
+            );
+            let is_leader = reqwest::get(url)
+                .await
+                .expect("HTTP request should not fail")
+                .text()
+                .await
+                .expect("Results should be string");
+            println!("is_leader: {}", is_leader);
+            if is_leader.contains("true") {
+                leader_count += 1;
+            }
+        }
+        assert_eq!(1, leader_count);
+
+        let body = PutAppendBody {
+            key: "hello".to_owned(),
+            value: "world".to_owned(),
+        };
+
+        for i in 0..3 {
+            let client = reqwest::Client::new();
+            client
+                .post(format!(
+                    "http://localhost:{}/kvstore/put",
+                    web_addrs[i].port()
+                ))
+                .json(&body)
+                .send()
+                .await
+                .expect("HTTP request should not fail");
+        }
+
+        for i in 0..3 {
+            let url = format!(
+                "http://localhost:{}/kvstore/get/{}",
+                web_addrs[i].port(),
+                "hello"
+            );
+            let result = reqwest::get(url)
+                .await
+                .expect("HTTP request should not fail")
+                .text()
+                .await
+                .expect("hi");
+            assert_eq!("hello!0!world", result);
+        }
+    }
 }