Przeglądaj źródła

Refactor KV server and remove is_retry.

Jing Yang 4 lat temu
rodzic
commit
5b538d60a8
3 zmienionych plików z 28 dodań i 24 usunięć
  1. 6 11
      kvraft/src/client.rs
  2. 7 1
      kvraft/src/common.rs
  3. 15 12
      kvraft/src/server.rs

+ 6 - 11
kvraft/src/client.rs

@@ -7,7 +7,7 @@ use serde::de::DeserializeOwned;
 use serde::Serialize;
 
 use crate::common::{
-    GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
+    GetArgs, GetEnum, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
 };
 use crate::common::{KVError, ValidReply};
@@ -95,22 +95,16 @@ impl ClerkInner {
         loop {
             let args = GetArgs {
                 key: "".to_string(),
+                op: GetEnum::NoDuplicate,
                 unique_id: self.unique_id.zero(),
             };
             let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
             if let Some(reply) = reply {
                 match reply.result {
                     Ok(_) => {
-                        if !reply.is_retry {
-                            // Discard the used unique_id.
-                            self.unique_id.inc();
-                            break;
-                        } else {
-                            // The RPC was successful, but the server has had an
-                            // exact same entry, which means someone else has taken
-                            // that clerk_id.
-                            self.unique_id = UniqueIdSequence::new();
-                        }
+                        // Discard the used unique_id.
+                        self.unique_id.inc();
+                        break;
                     }
                     Err(KVError::Expired) | Err(KVError::Conflict) => {
                         // The client ID happens to be re-used. The request does
@@ -190,6 +184,7 @@ impl ClerkInner {
     ) -> Option<Option<String>> {
         let args = GetArgs {
             key,
+            op: GetEnum::AllowDuplicate,
             unique_id: self.unique_id.inc(),
         };
         let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;

+ 7 - 1
kvraft/src/common.rs

@@ -81,9 +81,16 @@ pub struct PutAppendReply {
     pub result: Result<(), KVError>,
 }
 
+#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
+pub enum GetEnum {
+    AllowDuplicate,
+    NoDuplicate,
+}
+
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct GetArgs {
     pub key: String,
+    pub op: GetEnum,
 
     pub unique_id: UniqueId,
 }
@@ -91,7 +98,6 @@ pub struct GetArgs {
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct GetReply {
     pub result: Result<Option<String>, KVError>,
-    pub is_retry: bool,
 }
 
 #[derive(Clone, Debug, Default)]

+ 15 - 12
kvraft/src/server.rs

@@ -10,7 +10,7 @@ use parking_lot::{Condvar, Mutex};
 use ruaft::{ApplyCommandMessage, Persister, Raft, RpcClient, Term};
 
 use crate::common::{
-    ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
+    ClerkId, GetArgs, GetEnum, GetReply, KVError, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueId,
 };
 use crate::snapshot_holder::SnapshotHolder;
@@ -345,27 +345,30 @@ impl KVServer {
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
 
     pub fn get(&self, args: GetArgs) -> GetReply {
-        let (is_retry, result) = match self.block_for_commit(
+        let map_dup = match args.op {
+            GetEnum::AllowDuplicate => |r| Ok(r),
+            GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
+        };
+        let result = match self.block_for_commit(
             args.unique_id,
             KVOp::Get(args.key),
             Self::DEFAULT_TIMEOUT,
         ) {
-            Ok(result) => (false, result),
-            Err(CommitError::Duplicate(result)) => (true, result),
-            Err(CommitError::NotMe(result)) => (true, result),
-            Err(e) => {
-                return GetReply {
-                    result: Err(e.into()),
-                    is_retry: false,
-                }
-            }
+            Ok(result) => Ok(result),
+            Err(CommitError::Duplicate(result)) => map_dup(result),
+            Err(CommitError::NotMe(result)) => map_dup(result),
+            Err(e) => Err(e.into()),
+        };
+        let result = match result {
+            Ok(result) => result,
+            Err(e) => return GetReply { result: Err(e) },
         };
         let result = match result {
             CommitResult::Get(result) => Ok(result),
             CommitResult::Put => Err(KVError::Conflict),
             CommitResult::Append => Err(KVError::Conflict),
         };
-        GetReply { result, is_retry }
+        GetReply { result }
     }
 
     pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {