Procházet zdrojové kódy

Improve client side retrying.

1. Retry once at least for each server.
2. Consider NotLeader as invalid result.
3. Move to next server if RPC fails or the result is invalid.
Jing Yang před 4 roky
rodič
revize
137de21f0a
2 změnil soubory, kde provedl 42 přidání a 7 odebrání
  1. 20 7
      kvraft/src/client.rs
  2. 22 0
      kvraft/src/common.rs

+ 20 - 7
kvraft/src/client.rs

@@ -2,6 +2,7 @@ use super::common::{
     GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
 };
+use common::ValidReply;
 use labrpc::{Client, RequestMessage};
 use serde::de::DeserializeOwned;
 use serde::Serialize;
@@ -92,31 +93,43 @@ impl ClerkInner {
         }
     }
 
-    fn call_rpc<M: AsRef<str>, A: Clone + Serialize, R: DeserializeOwned>(
+    fn call_rpc<M, A, R>(
         &mut self,
         method: M,
         args: A,
         max_retry: Option<usize>,
-    ) -> Option<R> {
+    ) -> Option<R>
+    where
+        M: AsRef<str>,
+        A: Serialize,
+        R: DeserializeOwned + ValidReply,
+    {
         let method = method.as_ref().to_owned();
         let data = RequestMessage::from(
             bincode::serialize(&args)
                 .expect("Serialization of requests should not fail"),
         );
 
-        for _ in 0..max_retry.unwrap_or(usize::MAX) {
-            let index = self.last_server_index.load(Ordering::Relaxed);
+        let max_retry =
+            std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
+
+        let mut index = self.last_server_index.load(Ordering::Relaxed);
+        for _ in 0..max_retry {
             let client = &self.servers[index];
 
             let reply = self
                 .executor
                 .block_on(client.call_rpc(method.clone(), data.clone()));
             if let Ok(reply) = reply {
-                let ret = bincode::deserialize(reply.as_ref())
+                let ret: R = bincode::deserialize(reply.as_ref())
                     .expect("Deserialization of reply should not fail");
-                self.last_server_index.store(index, Ordering::Relaxed);
-                return Some(ret);
+                if ret.is_reply_valid() {
+                    self.last_server_index.store(index, Ordering::Relaxed);
+                    return Some(ret);
+                }
             }
+            index += 1;
+            index %= self.servers.len();
         }
         None
     }

+ 22 - 0
kvraft/src/common.rs

@@ -97,3 +97,25 @@ pub struct GetReply {
 pub struct KVRaftOptions {
     pub max_retry: Option<usize>,
 }
+
+pub trait ValidReply {
+    fn is_reply_valid(&self) -> bool;
+}
+
+impl ValidReply for PutAppendReply {
+    fn is_reply_valid(&self) -> bool {
+        if let Err(KVError::NotLeader) = &self.result {
+            return false;
+        }
+        return true;
+    }
+}
+
+impl ValidReply for GetReply {
+    fn is_reply_valid(&self) -> bool {
+        if let Err(KVError::NotLeader) = &self.result {
+            return false;
+        }
+        return true;
+    }
+}