Bladeren bron

Create a separate CommitSentinel RPC from the Get RPC.

Created new meessages. Boilerplates everywhere.

Also removed the now unused GetEnum.
Jing Yang 3 jaren geleden
bovenliggende
commit
285b004ccb
7 gewijzigde bestanden met toevoegingen van 86 en 30 verwijderingen
  1. 20 1
      durio/src/kv_service.rs
  2. 9 8
      kvraft/src/client.rs
  3. 16 7
      kvraft/src/common.rs
  4. 4 1
      kvraft/src/lib.rs
  5. 9 1
      kvraft/src/remote_kvraft.rs
  6. 9 10
      kvraft/src/server.rs
  7. 19 2
      test_configs/src/rpcs.rs

+ 20 - 1
durio/src/kv_service.rs

@@ -6,13 +6,15 @@ use async_trait::async_trait;
 use tarpc::context::Context;
 
 use kvraft::{
-    GetArgs, GetReply, KVServer, PutAppendArgs, PutAppendReply, RemoteKvraft,
+    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVServer,
+    PutAppendArgs, PutAppendReply, RemoteKvraft,
 };
 
 #[tarpc::service]
 pub(crate) trait KVService {
     async fn get(args: GetArgs) -> GetReply;
     async fn put_append(args: PutAppendArgs) -> PutAppendReply;
+    async fn commit_sentinel(args: CommitSentinelArgs) -> CommitSentinelReply;
 }
 
 #[derive(Clone)]
@@ -31,6 +33,14 @@ impl KVService for KVRpcServer {
     ) -> PutAppendReply {
         self.0.put_append(args).await
     }
+
+    async fn commit_sentinel(
+        self,
+        _context: Context,
+        args: CommitSentinelArgs,
+    ) -> CommitSentinelReply {
+        self.0.commit_sentinel(args).await
+    }
 }
 
 #[async_trait]
@@ -49,6 +59,15 @@ impl RemoteKvraft for KVServiceClient {
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
+
+    async fn commit_sentinel(
+        &self,
+        args: CommitSentinelArgs,
+    ) -> std::io::Result<CommitSentinelReply> {
+        self.commit_sentinel(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
 }
 
 #[allow(dead_code)]

+ 9 - 8
kvraft/src/client.rs

@@ -4,8 +4,9 @@ use std::sync::Once;
 use std::time::Duration;
 
 use crate::common::{
-    GetArgs, GetEnum, GetReply, KVError, KVRaftOptions, PutAppendArgs,
-    PutAppendEnum, PutAppendReply, UniqueIdSequence, ValidReply,
+    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVError,
+    KVRaftOptions, PutAppendArgs, PutAppendEnum, PutAppendReply,
+    UniqueIdSequence, ValidReply,
 };
 use crate::RemoteKvraft;
 
@@ -90,13 +91,14 @@ impl ClerkInner {
 
     fn commit_sentinel(&mut self) {
         loop {
-            let args = GetArgs {
-                key: "".to_string(),
-                op: GetEnum::NoDuplicate,
+            let args = CommitSentinelArgs {
                 unique_id: self.unique_id.zero(),
             };
-            let reply: Option<GetReply> =
-                self.retry_rpc(|remote, args| remote.get(args), args, Some(1));
+            let reply: Option<CommitSentinelReply> = self.retry_rpc(
+                |remote, args| remote.commit_sentinel(args),
+                args,
+                Some(1),
+            );
             if let Some(reply) = reply {
                 match reply.result {
                     Ok(_) => {
@@ -174,7 +176,6 @@ impl ClerkInner {
     ) -> Option<Option<String>> {
         let args = GetArgs {
             key,
-            op: GetEnum::AllowDuplicate,
             unique_id: self.unique_id.inc(),
         };
         let reply: GetReply = self.retry_rpc(

+ 16 - 7
kvraft/src/common.rs

@@ -72,16 +72,9 @@ pub struct PutAppendReply {
     pub result: Result<(), KVError>,
 }
 
-#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
-pub enum GetEnum {
-    AllowDuplicate,
-    NoDuplicate,
-}
-
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct GetArgs {
     pub key: String,
-    pub op: GetEnum,
 
     pub unique_id: UniqueId,
 }
@@ -96,6 +89,16 @@ pub struct KVRaftOptions {
     pub max_retry: Option<usize>,
 }
 
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct CommitSentinelArgs {
+    pub unique_id: UniqueId,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct CommitSentinelReply {
+    pub result: Result<(), KVError>,
+}
+
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub enum KVError {
     NotLeader,
@@ -128,3 +131,9 @@ impl ValidReply for GetReply {
         self.result.is_reply_valid()
     }
 }
+
+impl ValidReply for CommitSentinelReply {
+    fn is_reply_valid(&self) -> bool {
+        self.result.is_reply_valid()
+    }
+}

+ 4 - 1
kvraft/src/lib.rs

@@ -1,5 +1,8 @@
 pub use client::Clerk;
-pub use common::{GetArgs, GetReply, PutAppendArgs, PutAppendReply};
+pub use common::{
+    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, PutAppendArgs,
+    PutAppendReply,
+};
 pub use remote_kvraft::RemoteKvraft;
 pub use server::KVServer;
 pub use server::UniqueKVOp;

+ 9 - 1
kvraft/src/remote_kvraft.rs

@@ -1,6 +1,9 @@
 use async_trait::async_trait;
 
-use crate::common::{GetArgs, GetReply, PutAppendArgs, PutAppendReply};
+use crate::common::{
+    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, PutAppendArgs,
+    PutAppendReply,
+};
 
 #[async_trait]
 pub trait RemoteKvraft: Send + Sync + 'static {
@@ -10,4 +13,9 @@ pub trait RemoteKvraft: Send + Sync + 'static {
         &self,
         args: PutAppendArgs,
     ) -> std::io::Result<PutAppendReply>;
+
+    async fn commit_sentinel(
+        &self,
+        args: CommitSentinelArgs,
+    ) -> std::io::Result<CommitSentinelReply>;
 }

+ 9 - 10
kvraft/src/server.rs

@@ -9,6 +9,7 @@ use futures::FutureExt;
 use parking_lot::Mutex;
 use serde_derive::{Deserialize, Serialize};
 
+use crate::{CommitSentinelArgs, CommitSentinelReply};
 use ruaft::{
     ApplyCommandMessage, Index, Persister, Raft, RemoteRaft, Term,
     VerifyAuthorityResult,
@@ -17,7 +18,7 @@ use test_utils::log_with;
 use test_utils::thread_local_logger::LocalLogger;
 
 use crate::common::{
-    ClerkId, GetArgs, GetEnum, GetReply, KVError, PutAppendArgs, PutAppendEnum,
+    ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueId,
 };
 use crate::snapshot_holder::SnapshotHolder;
@@ -430,29 +431,27 @@ impl KVServer {
 
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
 
-    pub async fn commit_sentinel(&self, args: GetArgs) -> GetReply {
-        assert_eq!(args.op, GetEnum::NoDuplicate);
+    pub async fn commit_sentinel(
+        &self,
+        args: CommitSentinelArgs,
+    ) -> CommitSentinelReply {
         let result_fut = self.block_for_commit(
             args.unique_id,
-            KVOp::Get(args.key),
+            KVOp::Get(String::new()),
             Self::DEFAULT_TIMEOUT,
         );
         let result = match result_fut.await {
-            Ok(CommitResult::Get(result)) => Ok(result),
+            Ok(CommitResult::Get(_)) => Ok(()),
             Ok(CommitResult::Put) => Err(KVError::Conflict),
             Ok(CommitResult::Append) => Err(KVError::Conflict),
             Err(CommitError::Duplicate(_)) => Err(KVError::Conflict),
             Err(CommitError::NotMe(_)) => Err(KVError::Conflict),
             Err(e) => Err(e.into()),
         };
-        GetReply { result }
+        CommitSentinelReply { result }
     }
 
     pub async fn get(&self, args: GetArgs) -> GetReply {
-        if args.op == GetEnum::NoDuplicate {
-            return self.commit_sentinel(args).await;
-        }
-        assert_eq!(args.op, GetEnum::AllowDuplicate);
         let result = self.block_for_read(args.key).await;
         GetReply { result }
     }

+ 19 - 2
test_configs/src/rpcs.rs

@@ -9,7 +9,8 @@ use serde::Serialize;
 use futures_util::future::BoxFuture;
 use futures_util::FutureExt;
 use kvraft::{
-    GetArgs, GetReply, KVServer, PutAppendArgs, PutAppendReply, RemoteKvraft,
+    CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVServer,
+    PutAppendArgs, PutAppendReply, RemoteKvraft,
 };
 use ruaft::{
     AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
@@ -77,6 +78,7 @@ impl<Command: 'static + Send + Serialize> ruaft::RemoteRaft<Command>
 
 const GET: &str = "KVServer.Get";
 const PUT_APPEND: &str = "KVServer.PutAppend";
+const COMMIT_SENTINEL: &str = "KVServer.CommitSentinel";
 
 #[async_trait]
 impl RemoteKvraft for RpcClient {
@@ -90,6 +92,13 @@ impl RemoteKvraft for RpcClient {
     ) -> std::io::Result<PutAppendReply> {
         self.call_rpc(PUT_APPEND, args).await
     }
+
+    async fn commit_sentinel(
+        &self,
+        args: CommitSentinelArgs,
+    ) -> std::io::Result<CommitSentinelReply> {
+        self.call_rpc(COMMIT_SENTINEL, args).await
+    }
 }
 
 pub fn make_rpc_handler<Request, Reply, F>(
@@ -197,10 +206,18 @@ pub fn register_kv_server<
         }),
     )?;
 
+    let kv_clone = kv.clone();
     server.register_async_rpc_handler(
         PUT_APPEND.to_owned(),
         make_async_rpc_handler(move |args| async move {
-            kv.as_ref().put_append(args).await
+            kv_clone.as_ref().put_append(args).await
+        }),
+    )?;
+
+    server.register_async_rpc_handler(
+        COMMIT_SENTINEL.to_owned(),
+        make_async_rpc_handler(move |args| async move {
+            kv.as_ref().commit_sentinel(args).await
         }),
     )?;