Bladeren bron

Try commit a value in the kv store when receiving a GET request.

Jing Yang 4 jaren geleden
bovenliggende
commit
bf2bc68c51
4 gewijzigde bestanden met toevoegingen van 44 en 9 verwijderingen
  1. 1 1
      durio/Cargo.toml
  2. 38 5
      durio/src/main.rs
  3. 4 2
      durio/src/run.rs
  4. 1 1
      kvraft/src/lib.rs

+ 1 - 1
durio/Cargo.toml

@@ -23,7 +23,7 @@ serde = "1.0"
 serde_derive = "1.0"
 serde_json = "1.0"
 tarpc = { version = "0.27", features = ["serde-transport", "tcp"] }
-test_utils = { path = "../test_utils" }
+test_utils = { path = "../test_utils", default-features = false }
 tokio = { version = "1.7", features = ["macros", "rt-multi-thread", "time", "parking_lot"] }
 tokio-serde = { version = "0.8", features = ["json"] }
 warp = "0.3"

+ 38 - 5
durio/src/main.rs

@@ -1,3 +1,4 @@
+use std::convert::Infallible;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
@@ -17,15 +18,31 @@ struct PutAppendBody {
     value: String,
 }
 
-#[tokio::main]
-async fn main() {
-    test_utils::init_log("durio").expect("Initiating log should not fail");
+async fn run_web_server() {
     let kv_addr = ([127, 0, 0, 1], 9988).into();
     let raft_addr = ([127, 0, 0, 1], 10001).into();
-    run_kv_instance(kv_addr, vec![raft_addr], 0)
+    let kv_server = run_kv_instance(kv_addr, vec![raft_addr], 0)
         .await
         .expect("Running kv instance should not fail");
 
+    let try_get = warp::path::param().and_then(move |_: u32| {
+        let kv_server = kv_server.clone();
+        async move {
+            let value = kv_server
+                .get(kvraft::GetArgs {
+                    key: "".to_string(),
+                    op: kvraft::GetEnum::AllowDuplicate,
+                    unique_id: Default::default(),
+                })
+                .await
+                .result
+                .expect("Get should not fail");
+            let result: Result<String, Infallible> =
+                Ok(value.unwrap_or_default());
+            result
+        }
+    });
+
     let counter = Arc::new(AtomicUsize::new(0));
     let counter_clone = counter.clone();
     let get = warp::path!("kvstore" / "get" / String).map(move |key| {
@@ -43,6 +60,22 @@ async fn main() {
         .and(warp::body::json())
         .map(|key, _body: PutAppendBody| warp::reply::json(&key));
 
-    let routes = warp::get().and(get.or(put).or(append));
+    let routes = warp::get().and(get.or(put).or(append).or(try_get));
     warp::serve(routes).run(([0, 0, 0, 0], 9090)).await;
 }
+
+fn main() {
+    test_utils::init_log("durio").expect("Initiating log should not fail");
+    let local_logger = test_utils::thread_local_logger::get();
+
+    let thread_pool = tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .thread_name("durio")
+        .on_thread_start(move || {
+            test_utils::thread_local_logger::set(local_logger.clone())
+        })
+        .build()
+        .expect("Creating thread pool should not fail");
+
+    thread_pool.block_on(run_web_server());
+}

+ 4 - 2
durio/src/run.rs

@@ -13,7 +13,7 @@ pub(crate) async fn run_kv_instance(
     addr: SocketAddr,
     raft_peers: Vec<SocketAddr>,
     me: usize,
-) -> std::io::Result<()> {
+) -> std::io::Result<Arc<KVServer>> {
     let mut remote_rafts = vec![];
     for (index, raft_peer) in raft_peers.iter().enumerate() {
         let remote_raft = if index == me {
@@ -30,5 +30,7 @@ pub(crate) async fn run_kv_instance(
     let raft = Arc::new(kv_server.raft().clone());
 
     start_raft_service_server(raft_peers[me], raft).await?;
-    start_kv_service_server(addr, kv_server).await
+    start_kv_service_server(addr, kv_server.clone()).await?;
+
+    Ok(kv_server)
 }

+ 1 - 1
kvraft/src/lib.rs

@@ -1,5 +1,5 @@
 pub use client::Clerk;
-pub use common::{GetArgs, GetReply, PutAppendArgs, PutAppendReply};
+pub use common::{GetArgs, GetEnum, GetReply, PutAppendArgs, PutAppendReply};
 pub use remote_kvraft::RemoteKvraft;
 pub use server::KVServer;
 pub use server::UniqueKVOp;