Преглед изворни кода

Add RPC timeout to client.

Jing Yang пре 4 година
родитељ
комит
078f317993
1 измењених фајлова са 13 додато и 4 уклоњено
  1. 13 4
      kvraft/src/client.rs

+ 13 - 4
kvraft/src/client.rs

@@ -1,5 +1,6 @@
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Once;
+use std::time::Duration;
 
 use labrpc::{Client, RequestMessage};
 use serde::de::DeserializeOwned;
@@ -87,6 +88,7 @@ impl ClerkInner {
             executor: tokio::runtime::Builder::new_multi_thread()
                 .thread_name("kvraft-clerk")
                 .worker_threads(1)
+                .enable_time()
                 .build()
                 .expect("Creating thread pool should not fail"),
         }
@@ -148,10 +150,17 @@ impl ClerkInner {
         let mut index = self.last_server_index.load(Ordering::Relaxed);
         for _ in 0..max_retry {
             let client = &self.servers[index];
-
-            let reply = self
-                .executor
-                .block_on(client.call_rpc(method.clone(), data.clone()));
+            let rpc_response = self.executor.block_on(async {
+                tokio::time::timeout(
+                    Duration::from_secs(1),
+                    client.call_rpc(method.clone(), data.clone()),
+                )
+                .await
+            });
+            let reply = match rpc_response {
+                Ok(reply) => reply,
+                Err(e) => Err(e.into()),
+            };
             if let Ok(reply) = reply {
                 let ret: R = bincode::deserialize(reply.as_ref())
                     .expect("Deserialization of reply should not fail");