Browse Source

Implement clerk without re-initialize.

Jing Yang 5 năm trước cách đây
mục cha
commit
712a8cc735
2 tập tin đã thay đổi với 91 bổ sung29 xóa
  1. 88 12
      tests/kvraft/client.rs
  2. 3 17
      tests/kvraft/common.rs

+ 88 - 12
tests/kvraft/client.rs

@@ -1,17 +1,18 @@
 use super::common::{
-    call_rpc, GetArgs, GetReply, PutAppendArgs, PutAppendReply,
+    GetArgs, GetReply, KVRaftOptions, KvError, PutAppendArgs, PutAppendReply,
     UniqueIdSequence, GET, PUT_APPEND,
 };
-use labrpc::Client;
+use crate::kvraft::common::PutAppendEnum;
+use labrpc::{Client, RequestMessage};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
 use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Once;
 
 struct Clerk {
     servers: Vec<Client>,
 
     last_server_index: AtomicUsize,
     unique_id: UniqueIdSequence,
-    init: Once,
 
     executor: tokio::runtime::Runtime,
 }
@@ -23,7 +24,6 @@ impl Clerk {
 
             last_server_index: AtomicUsize::new(0),
             unique_id: UniqueIdSequence::new(),
-            init: Once::new(),
 
             executor: tokio::runtime::Builder::new_multi_thread()
                 .thread_name("kvraft-clerk")
@@ -35,15 +35,12 @@ impl Clerk {
 
     fn commit_sentinel(&mut self) {
         loop {
-            let index = self.server_index();
-            let client = &self.servers[index];
             let args = GetArgs {
                 key: "".to_string(),
                 unique_id: self.unique_id.zero(),
             };
-            let reply: labrpc::Result<GetReply> =
-                self.executor.block_on(call_rpc(client, GET, args));
-            if let Ok(reply) = reply {
+            let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
+            if let Some(reply) = reply {
                 match reply.result {
                     Ok(_) => {
                         if !reply.is_retry {
@@ -63,7 +60,86 @@ impl Clerk {
         }
     }
 
-    fn server_index(&self) -> usize {
-        self.last_server_index.load(Ordering::Relaxed)
+    fn call_rpc<M: AsRef<str>, A: Clone + Serialize, R: DeserializeOwned>(
+        &mut self,
+        method: M,
+        args: A,
+        max_retry: Option<usize>,
+    ) -> Option<R> {
+        let method = method.as_ref().to_owned();
+        let data = RequestMessage::from(
+            bincode::serialize(&args)
+                .expect("Serialization of requests should not fail"),
+        );
+
+        for _ in 0..max_retry.unwrap_or(usize::MAX) {
+            let index = self.last_server_index.load(Ordering::Relaxed);
+            let client = &self.servers[index];
+
+            let reply = self
+                .executor
+                .block_on(client.call_rpc(method.clone(), data.clone()));
+            if let Ok(reply) = reply {
+                let ret = bincode::deserialize(reply.as_ref())
+                    .expect("Deserialization of reply should not fail");
+                self.last_server_index.store(index, Ordering::Relaxed);
+                return Some(ret);
+            }
+        }
+        None
+    }
+
+    pub fn get(
+        &mut self,
+        key: String,
+        options: KVRaftOptions,
+    ) -> Option<String> {
+        let args = GetArgs {
+            key,
+            unique_id: self.unique_id.inc(),
+        };
+        let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
+        match reply.result {
+            Ok(val) => Some(val),
+            Err(KvError::NoKey) => Some(Default::default()),
+            _ => None,
+        }
+    }
+
+    fn put_append(
+        &mut self,
+        key: String,
+        value: String,
+        op: PutAppendEnum,
+        options: KVRaftOptions,
+    ) -> Option<()> {
+        let args = PutAppendArgs {
+            key,
+            value,
+            op,
+            unique_id: self.unique_id.inc(),
+        };
+        let reply: PutAppendReply =
+            self.call_rpc(PUT_APPEND, args, options.max_retry)?;
+        assert!(reply.result.is_ok());
+        Some(())
+    }
+
+    pub fn put(
+        &mut self,
+        key: String,
+        value: String,
+        options: KVRaftOptions,
+    ) -> Option<()> {
+        self.put_append(key, value, PutAppendEnum::Put, options)
+    }
+
+    pub fn append(
+        &mut self,
+        key: String,
+        value: String,
+        options: KVRaftOptions,
+    ) -> Option<()> {
+        self.put_append(key, value, PutAppendEnum::Append, options)
     }
 }

+ 3 - 17
tests/kvraft/common.rs

@@ -1,7 +1,4 @@
-use labrpc::{Client, RequestMessage};
 use rand::{thread_rng, RngCore};
-use serde::de::DeserializeOwned;
-use serde::Serialize;
 use std::sync::atomic::{AtomicU64, Ordering};
 
 #[derive(
@@ -84,18 +81,7 @@ pub struct GetReply {
     pub is_retry: bool,
 }
 
-pub async fn call_rpc<M: AsRef<str>, A: Serialize, R: DeserializeOwned>(
-    client: &Client,
-    method: M,
-    args: A,
-) -> labrpc::Result<R> {
-    let data = RequestMessage::from(
-        bincode::serialize(&args)
-            .expect("Serialization of requests should not fail"),
-    );
-
-    let reply = client.call_rpc(method.as_ref().to_owned(), data).await?;
-
-    Ok(bincode::deserialize(reply.as_ref())
-        .expect("Deserialization of reply should not fail"))
+#[derive(Clone, Debug, Default)]
+pub struct KVRaftOptions {
+    pub max_retry: Option<usize>,
 }