Parcourir la source

Use the new AsyncClerk in Durio.

The previous daemon thread is no longer needed!
Jing Yang il y a 3 ans
Parent
commit
09af46a746
3 fichiers modifiés avec 64 ajouts et 156 suppressions
  1. 31 3
      durio/src/kv_service.rs
  2. 33 28
      durio/src/main.rs
  3. 0 125
      durio/src/one_clerk.rs

+ 31 - 3
durio/src/kv_service.rs

@@ -6,8 +6,8 @@ use async_trait::async_trait;
 use tarpc::context::Context;
 
 use kvraft::{
-    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVServer,
-    PutAppendArgs, PutAppendReply, RemoteKvraft,
+    AsyncClerk, CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply,
+    KVServer, PutAppendArgs, PutAppendReply, RemoteKvraft,
 };
 
 #[tarpc::service]
@@ -70,7 +70,6 @@ impl RemoteKvraft for KVServiceClient {
     }
 }
 
-#[allow(dead_code)]
 pub(crate) async fn connect_to_kv_service(
     addr: SocketAddr,
 ) -> std::io::Result<KVServiceClient> {
@@ -84,6 +83,35 @@ pub(crate) async fn connect_to_kv_service(
     Ok(client)
 }
 
+pub async fn connect_to_kv_services(
+    socket_addrs: Vec<SocketAddr>,
+) -> Vec<impl RemoteKvraft> {
+    log::info!("Starting clerk creation ...");
+    let mut clients = vec![None; socket_addrs.len()];
+    while clients.iter().filter(|e| e.is_none()).count() != 0 {
+        for (index, socket_addr) in socket_addrs.iter().enumerate() {
+            if clients[index].is_some() {
+                continue;
+            }
+            let result = connect_to_kv_service(*socket_addr).await;
+            match result {
+                Ok(client) => clients[index] = Some(client),
+                Err(e) => {
+                    log::error!("Error connecting to {:?}: {}", socket_addr, e)
+                }
+            }
+        }
+        log::info!("Clerk clients are {:?}", clients);
+    }
+
+    log::info!("Done clerk creation ...");
+    clients.into_iter().map(|e| e.unwrap()).collect()
+}
+
+pub async fn create_async_clerk(socket_addrs: Vec<SocketAddr>) -> AsyncClerk {
+    AsyncClerk::new(connect_to_kv_services(socket_addrs).await)
+}
+
 pub(crate) fn start_kv_service_server(
     addr: SocketAddr,
     kv_server: Arc<KVServer>,

+ 33 - 28
durio/src/main.rs

@@ -1,3 +1,4 @@
+use std::convert::Infallible;
 use std::net::SocketAddr;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
@@ -7,11 +8,10 @@ use lazy_static::lazy_static;
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
 
-use crate::one_clerk::create_clerk;
+use crate::kv_service::create_async_clerk;
 use crate::run::run_kv_instance;
 
 mod kv_service;
-mod one_clerk;
 mod persister;
 mod raft_service;
 mod run;
@@ -45,28 +45,32 @@ lazy_static! {
     ];
 }
 
-const NOT_READY: &str = "Clerk is not ready";
-
 async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
     let is_leader = warp::get()
         .and(warp::path!("kvstore" / "is_leader"))
-        .map(move || format!("{:?}", kv_server.raft().get_state()));
+        .and_then(move || {
+            let kv_server = kv_server.clone();
+            async move {
+                let ret: Result<String, Infallible> =
+                    Ok(format!("{:?}", kv_server.raft().get_state()));
+                ret
+            }
+        });
 
+    let clerk = Arc::new(create_async_clerk(KV_ADDRS.clone()).await);
     let counter = Arc::new(AtomicUsize::new(0));
-    let counter_2 = counter.clone();
-    let counter_3 = counter.clone();
-    let clerk = create_clerk(KV_ADDRS.clone());
 
     let get_clerk = clerk.clone();
     let get = warp::get()
         .and(warp::path!("kvstore" / "get" / String))
-        .map(move |key: String| {
+        .and_then(move |key: String| {
             let counter = counter.fetch_add(1, Ordering::SeqCst).to_string();
-            match get_clerk.get(key.clone()) {
-                Some(value) => {
-                    key + "!" + counter.as_str() + "!" + value.as_str()
-                }
-                None => NOT_READY.to_string(),
+            let get_clerk = get_clerk.clone();
+            async move {
+                let value = get_clerk.get(&key).await.unwrap_or_default();
+                let ret: Result<String, Infallible> =
+                    Ok(key + "!" + counter.as_str() + "!" + value.as_str());
+                ret
             }
         });
 
@@ -74,25 +78,26 @@ async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
     let put = warp::post()
         .and(warp::path!("kvstore" / "put"))
         .and(warp::body::json())
-        .map(move |body: PutAppendBody| {
-            counter_2.fetch_add(1, Ordering::SeqCst);
-            let code = match put_clerk.put(body.key, body.value) {
-                None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
-                Some(_) => warp::http::StatusCode::OK,
-            };
-            warp::reply::with_status(warp::reply(), code)
+        .and_then(move |body: PutAppendBody| {
+            let put_clerk = put_clerk.clone();
+            async move {
+                put_clerk.put(body.key, body.value).await;
+                let ret: Result<String, Infallible> = Ok("OK".to_string());
+                ret
+            }
         });
+
     let append_clerk = clerk.clone();
     let append = warp::post()
         .and(warp::path!("kvstore" / "append"))
         .and(warp::body::json())
-        .map(move |body: PutAppendBody| {
-            counter_3.fetch_add(1, Ordering::SeqCst);
-            let code = match append_clerk.append(body.key, body.value) {
-                None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
-                Some(_) => warp::http::StatusCode::OK,
-            };
-            warp::reply::with_status(warp::reply(), code)
+        .and_then(move |body: PutAppendBody| {
+            let append_clerk = append_clerk.clone();
+            async move {
+                append_clerk.append(body.key, body.value).await;
+                let ret: Result<String, Infallible> = Ok("OK".to_string());
+                ret
+            }
         });
 
     let routes = is_leader.or(get).or(put).or(append);

+ 0 - 125
durio/src/one_clerk.rs

@@ -1,125 +0,0 @@
-use std::net::SocketAddr;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::mpsc::Sender;
-use std::sync::Arc;
-
-use crate::kv_service::connect_to_kv_service;
-use kvraft::Clerk;
-
-pub(crate) fn create_clerk(socket_addrs: Vec<SocketAddr>) -> OneClerk {
-    OneClerk::create(socket_addrs)
-}
-
-#[derive(Clone)]
-pub(crate) struct OneClerk {
-    ready: Arc<AtomicBool>,
-    requests: crossbeam_channel::Sender<(ClerkRequest, Sender<String>)>,
-}
-
-enum ClerkRequest {
-    Get(String),
-    Put(String, String),
-    Append(String, String),
-}
-
-impl OneClerk {
-    pub(crate) fn create(socket_addrs: Vec<SocketAddr>) -> Self {
-        let ready = Arc::new(AtomicBool::new(false));
-        // Create a thread that blocks on all requests to the clerk.
-        let requests = Self::run_clerk_thread(socket_addrs, ready.clone());
-
-        OneClerk { ready, requests }
-    }
-
-    async fn initialize_clerk(socket_addrs: Vec<SocketAddr>) -> Clerk {
-        log::info!("Starting clerk creation ...");
-        let mut clients = vec![None; socket_addrs.len()];
-        while clients.iter().filter(|e| e.is_none()).count() != 0 {
-            for (index, socket_addr) in socket_addrs.iter().enumerate() {
-                let result = connect_to_kv_service(*socket_addr).await;
-                match result {
-                    Ok(client) => clients[index] = Some(client),
-                    Err(e) => log::error!(
-                        "Error connecting to {:?}: {}",
-                        socket_addr,
-                        e
-                    ),
-                }
-            }
-            log::info!("Clerk clients are {:?}", clients);
-        }
-
-        log::info!("Done clerk creation ...");
-        let clients = clients.into_iter().map(|e| e.unwrap()).collect();
-        Clerk::new(clients)
-    }
-
-    /// A thread must be created for get requests. We cannot run the blocking
-    /// Clerk functions on tokio thread pool threads.
-    fn run_clerk_thread(
-        socket_addrs: Vec<SocketAddr>,
-        ready: Arc<AtomicBool>,
-    ) -> crossbeam_channel::Sender<(ClerkRequest, Sender<String>)> {
-        let local_logger =
-            test_utils::thread_local_logger::LocalLogger::inherit();
-
-        // Steal a tokio runtime to run the initializer.
-        let tokio_handle = tokio::runtime::Handle::current();
-        let (tx, rx) =
-            crossbeam_channel::unbounded::<(ClerkRequest, Sender<String>)>();
-        std::thread::spawn(move || {
-            local_logger.attach();
-
-            let mut clerk =
-                tokio_handle.block_on(Self::initialize_clerk(socket_addrs));
-            clerk.init_once();
-
-            ready.store(true, Ordering::Release);
-
-            while let Ok((request, result)) = rx.recv() {
-                let value = match request {
-                    ClerkRequest::Get(key) => {
-                        clerk.get(key).unwrap_or_default()
-                    }
-                    ClerkRequest::Put(key, value) => {
-                        clerk.put(key, value);
-                        String::default()
-                    }
-                    ClerkRequest::Append(key, value) => {
-                        clerk.append(key, value);
-                        String::default()
-                    }
-                };
-                let _ = result.send(value);
-            }
-        });
-        tx
-    }
-
-    fn request(&self, request: ClerkRequest) -> Option<String> {
-        if !self.ready.load(Ordering::Acquire) {
-            return None;
-        }
-
-        let (result_tx, result_rx) = std::sync::mpsc::channel();
-        self.requests
-            .send((request, result_tx))
-            .expect("Send get request should not fail");
-        let value = result_rx
-            .recv()
-            .expect("Receiving get response should not fail");
-        Some(value)
-    }
-
-    pub(crate) fn get(&self, key: String) -> Option<String> {
-        self.request(ClerkRequest::Get(key))
-    }
-
-    pub(crate) fn put(&self, key: String, value: String) -> Option<String> {
-        self.request(ClerkRequest::Put(key, value))
-    }
-
-    pub(crate) fn append(&self, key: String, value: String) -> Option<String> {
-        self.request(ClerkRequest::Append(key, value))
-    }
-}