Explorar el Código

Integrate Clerk into the web server.

Clerk is blocking and uses tokio under the hood. A new thread has
been created to block on clerk operations.

No attempt has been made to make the Clerk async, because each clerk
is only supposed to send at most one operation at the same time.
Jing Yang hace 4 años
padre
commit
8232d7d47b
Se han modificado 3 ficheros con 166 adiciones y 33 borrados
  1. 2 0
      durio/Cargo.toml
  2. 39 33
      durio/src/main.rs
  3. 125 0
      durio/src/one_clerk.rs

+ 2 - 0
durio/Cargo.toml

@@ -15,9 +15,11 @@ categories = ["raft"]
 [dependencies]
 async-trait = "0.1"
 bytes = "1.0"
+crossbeam-channel = "0.5.1"
 futures-util = "0.3.15"
 kvraft = { path = "../kvraft" }
 lazy_static = "1.4.0"
+log = "0.4"
 parking_lot = "0.11.1"
 ruaft = { path = "..", features = ["integration-test"] }
 serde = "1.0"

+ 39 - 33
durio/src/main.rs

@@ -1,4 +1,3 @@
-use std::convert::Infallible;
 use std::net::SocketAddr;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
@@ -7,9 +6,11 @@ use lazy_static::lazy_static;
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
 
+use crate::one_clerk::create_clerk;
 use crate::run::run_kv_instance;
 
 mod kv_service;
+mod one_clerk;
 mod persister;
 mod raft_service;
 mod run;
@@ -17,6 +18,7 @@ mod utils;
 
 #[derive(Deserialize, Serialize)]
 struct PutAppendBody {
+    key: String,
     value: String,
 }
 
@@ -38,51 +40,55 @@ lazy_static! {
     ];
 }
 
+const NOT_READY: &str = "Clerk is not ready";
+
 async fn run_web_server(me: usize) {
     let kv_server = run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
         .await
         .expect("Running kv instance should not fail");
-    let is_leader_kv_server = kv_server.clone();
 
-    let try_get = warp::path::param().and_then(move |_: u32| {
-        let kv_server = kv_server.clone();
-        async move {
-            let value = kv_server
-                .get(kvraft::GetArgs {
-                    key: "".to_string(),
-                    op: kvraft::GetEnum::AllowDuplicate,
-                    unique_id: Default::default(),
-                })
-                .await
-                .result
-                .expect("Get should not fail");
-            let result: Result<String, Infallible> =
-                Ok(value.unwrap_or_default());
-            result
-        }
-    });
-    let is_leader = warp::path!("kvstore" / "is_leader")
-        .map(move || format!("{:?}", is_leader_kv_server.raft().get_state()));
+    let is_leader = warp::get()
+        .and(warp::path!("kvstore" / "is_leader"))
+        .map(move || format!("{:?}", kv_server.raft().get_state()));
 
     let counter = Arc::new(AtomicUsize::new(0));
-    let counter_clone = counter.clone();
-    let get = warp::path!("kvstore" / "get" / String).map(move |key| {
-        key + "!" + counter.fetch_add(1, Ordering::SeqCst).to_string().as_str()
-    });
+    let counter_2 = counter.clone();
+    let counter_3 = counter.clone();
+    let clerk = create_clerk(KV_ADDRS.clone());
+
+    let get_clerk = clerk.clone();
+    let get = warp::get()
+        .and(warp::path!("kvstore" / "get" / String))
+        .map(move |key: String| {
+            let counter = counter.fetch_add(1, Ordering::SeqCst).to_string();
+            match get_clerk.get(key.clone()) {
+                Some(value) => {
+                    key + "!" + counter.as_str() + "!" + value.as_str()
+                }
+                None => NOT_READY.to_string(),
+            }
+        });
+
+    let put_clerk = clerk.clone();
     let put = warp::post()
-        .and(warp::path!("kvstore" / "put" / String))
+        .and(warp::path!("kvstore" / "put"))
         .and(warp::body::json())
-        .map(move |key, _body: PutAppendBody| {
-            counter_clone.fetch_add(1, Ordering::SeqCst);
-            warp::reply::json(&key)
+        .map(move |body: PutAppendBody| {
+            counter_2.fetch_add(1, Ordering::SeqCst);
+            put_clerk.put(body.key, body.value);
+            warp::reply::reply()
         });
+    let append_clerk = clerk.clone();
     let append = warp::post()
-        .and(warp::path!("kvstore" / "append" / String))
+        .and(warp::path!("kvstore" / "append"))
         .and(warp::body::json())
-        .map(|key, _body: PutAppendBody| warp::reply::json(&key));
+        .map(move |body: PutAppendBody| {
+            counter_3.fetch_add(1, Ordering::SeqCst);
+            append_clerk.append(body.key, body.value);
+            warp::reply::reply()
+        });
 
-    let routes =
-        warp::get().and(get.or(put).or(append).or(is_leader).or(try_get));
+    let routes = is_leader.or(get).or(put).or(append);
     warp::serve(routes).run(WEB_ADDRS[me]).await;
 }
 

+ 125 - 0
durio/src/one_clerk.rs

@@ -0,0 +1,125 @@
+use std::net::SocketAddr;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc::Sender;
+use std::sync::Arc;
+
+use crate::kv_service::connect_to_kv_service;
+use kvraft::Clerk;
+
+pub(crate) fn create_clerk(socket_addrs: Vec<SocketAddr>) -> OneClerk {
+    OneClerk::create(socket_addrs)
+}
+
+#[derive(Clone)]
+pub(crate) struct OneClerk {
+    ready: Arc<AtomicBool>,
+    requests: crossbeam_channel::Sender<(ClerkRequest, Sender<String>)>,
+}
+
+enum ClerkRequest {
+    Get(String),
+    Put(String, String),
+    Append(String, String),
+}
+
+impl OneClerk {
+    pub(crate) fn create(socket_addrs: Vec<SocketAddr>) -> Self {
+        let ready = Arc::new(AtomicBool::new(false));
+        // Create a thread that blocks on all requests to the clerk.
+        let requests = Self::run_clerk_thread(socket_addrs, ready.clone());
+
+        OneClerk { ready, requests }
+    }
+
+    async fn initialize_clerk(socket_addrs: Vec<SocketAddr>) -> Clerk {
+        log::info!("Starting clerk creation ...");
+        let mut clients = vec![None; socket_addrs.len()];
+        while clients.iter().filter(|e| e.is_none()).count() != 0 {
+            for (index, socket_addr) in socket_addrs.iter().enumerate() {
+                let result = connect_to_kv_service(socket_addr.clone()).await;
+                match result {
+                    Ok(client) => clients[index] = Some(client),
+                    Err(e) => log::error!(
+                        "Error connecting to {:?}: {}",
+                        socket_addr,
+                        e
+                    ),
+                }
+            }
+            log::info!("Clerk clients are {:?}", clients);
+        }
+
+        log::info!("Done clerk creation ...");
+        let clients = clients.into_iter().map(|e| e.unwrap()).collect();
+        Clerk::new(clients)
+    }
+
+    /// A thread must be created for get requests. We cannot run the blocking
+    /// Clerk functions on tokio thread pool threads.
+    fn run_clerk_thread(
+        socket_addrs: Vec<SocketAddr>,
+        ready: Arc<AtomicBool>,
+    ) -> crossbeam_channel::Sender<(ClerkRequest, Sender<String>)> {
+        let local_logger =
+            test_utils::thread_local_logger::LocalLogger::inherit();
+
+        // Steal a tokio runtime to run the initializer.
+        let tokio_handle = tokio::runtime::Handle::current();
+        let (tx, rx) =
+            crossbeam_channel::unbounded::<(ClerkRequest, Sender<String>)>();
+        std::thread::spawn(move || {
+            local_logger.attach();
+
+            let mut clerk =
+                tokio_handle.block_on(Self::initialize_clerk(socket_addrs));
+            clerk.init_once();
+
+            ready.store(true, Ordering::Release);
+
+            while let Ok((request, result)) = rx.recv() {
+                let value = match request {
+                    ClerkRequest::Get(key) => {
+                        clerk.get(key).unwrap_or_default()
+                    }
+                    ClerkRequest::Put(key, value) => {
+                        clerk.put(key, value);
+                        String::default()
+                    }
+                    ClerkRequest::Append(key, value) => {
+                        clerk.append(key, value);
+                        String::default()
+                    }
+                };
+                let _ = result.send(value);
+            }
+        });
+        return tx;
+    }
+
+    fn request(&self, request: ClerkRequest) -> Option<String> {
+        if !self.ready.load(Ordering::Acquire) {
+            return None;
+        }
+
+        let (result_tx, result_rx) = std::sync::mpsc::channel();
+        self.requests
+            .send((request, result_tx))
+            .expect("Send get request should not fail");
+        let value = result_rx
+            .recv()
+            .expect("Receiving get response should not fail");
+        Some(value)
+    }
+
+    pub(crate) fn get(&self, key: String) -> Option<String> {
+        self.request(ClerkRequest::Get(key))
+    }
+
+    pub(crate) fn put(&self, key: String, value: String) -> Option<String> {
+        self.request(ClerkRequest::Put(key, value))
+    }
+
+    pub(crate) fn append(&self, key: String, value: String) -> Option<String> {
+        self.request(ClerkRequest::Append(key, value))
+    }
+}