Quellcode durchsuchen

Add get method to server.

Jing Yang vor 5 Jahren
Ursprung
Commit
b76ce9e862
3 geänderte Dateien mit 52 neuen und 12 gelöschten Zeilen
  1. 2 3
      tests/kvraft/client.rs
  2. 7 5
      tests/kvraft/common.rs
  3. 43 4
      tests/kvraft/server.rs

+ 2 - 3
tests/kvraft/client.rs

@@ -1,5 +1,5 @@
 use super::common::{
-    GetArgs, GetReply, KVRaftOptions, KvError, PutAppendArgs, PutAppendReply,
+    GetArgs, GetReply, KVError, KVRaftOptions, PutAppendArgs, PutAppendReply,
     UniqueIdSequence, GET, PUT_APPEND,
 };
 use crate::kvraft::common::PutAppendEnum;
@@ -143,8 +143,7 @@ impl ClerkInner {
         };
         let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
         match reply.result {
-            Ok(val) => Some(val),
-            Err(KvError::NoKey) => Some(Default::default()),
+            Ok(val) => val,
             _ => None,
         }
     }

+ 7 - 5
tests/kvraft/common.rs

@@ -68,14 +68,16 @@ pub struct PutAppendArgs {
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum KvError {
-    NoKey,
-    Other(String),
+pub enum KVError {
+    NotLeader,
+    Expired,
+    TimedOut,
+    Conflict,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct PutAppendReply {
-    pub result: Result<(), KvError>,
+    pub result: Result<(), KVError>,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -87,7 +89,7 @@ pub struct GetArgs {
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct GetReply {
-    pub result: Result<String, KvError>,
+    pub result: Result<Option<String>, KVError>,
     pub is_retry: bool,
 }
 

+ 43 - 4
tests/kvraft/server.rs

@@ -1,4 +1,5 @@
 use super::common::{ClerkId, UniqueId};
+use crate::kvraft::common::{GetArgs, GetReply, KVError};
 use parking_lot::{Condvar, Mutex};
 use ruaft::{Persister, Raft, RpcClient};
 use std::collections::HashMap;
@@ -64,21 +65,34 @@ enum KVOpStep {
     Done(CommitResult),
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 enum CommitResult {
-    Get(String),
+    Get(Option<String>),
     Put,
     Append,
 }
 
+#[derive(Debug)]
 enum CommitError {
     NotLeader,
     Expired(UniqueId),
-    Timeout,
+    TimedOut,
     Conflict,
     Duplicate(CommitResult),
 }
 
+impl From<CommitError> for KVError {
+    fn from(err: CommitError) -> Self {
+        match err {
+            CommitError::NotLeader => KVError::NotLeader,
+            CommitError::Expired(_) => KVError::Expired,
+            CommitError::TimedOut => KVError::TimedOut,
+            CommitError::Conflict => KVError::Conflict,
+            CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
+        }
+    }
+}
+
 impl KVServer {
     pub fn new(
         servers: Vec<RpcClient>,
@@ -194,8 +208,33 @@ impl KVServer {
                 Err(CommitError::Duplicate(result.clone()))
             }
         } else {
-            Err(CommitError::Timeout)
+            Err(CommitError::TimedOut)
+        };
+    }
+
+    const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
+
+    pub fn get(&self, args: GetArgs) -> GetReply {
+        let (is_retry, result) = match self.block_for_commit(
+            args.unique_id,
+            KVOp::Get(GetOp { key: args.key }),
+            Self::DEFAULT_TIMEOUT,
+        ) {
+            Ok(result) => (false, result),
+            Err(CommitError::Duplicate(result)) => (true, result),
+            Err(e) => {
+                return GetReply {
+                    result: Err(e.into()),
+                    is_retry: false,
+                }
+            }
+        };
+        let result = match result {
+            CommitResult::Get(result) => Ok(result),
+            CommitResult::Put => Err(KVError::Conflict),
+            CommitResult::Append => Err(KVError::Conflict),
         };
+        GetReply { result, is_retry }
     }
 
     pub fn kill(self) {