|
|
@@ -1,15 +1,12 @@
|
|
|
+use std::future::Future;
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
use std::sync::Once;
|
|
|
use std::time::Duration;
|
|
|
|
|
|
-use serde::de::DeserializeOwned;
|
|
|
-use serde::Serialize;
|
|
|
-
|
|
|
use crate::common::{
|
|
|
- GetArgs, GetEnum, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
|
|
|
- PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
|
|
|
+ GetArgs, GetEnum, GetReply, KVError, KVRaftOptions, PutAppendArgs,
|
|
|
+ PutAppendEnum, PutAppendReply, UniqueIdSequence, ValidReply,
|
|
|
};
|
|
|
-use crate::common::{KVError, ValidReply};
|
|
|
use crate::RemoteKvraft;
|
|
|
|
|
|
pub struct Clerk {
|
|
|
@@ -98,7 +95,8 @@ impl ClerkInner {
|
|
|
op: GetEnum::NoDuplicate,
|
|
|
unique_id: self.unique_id.zero(),
|
|
|
};
|
|
|
- let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
|
|
|
+ let reply: Option<GetReply> =
|
|
|
+ self.retry_rpc(|remote, args| remote.get(args), args, Some(1));
|
|
|
if let Some(reply) = reply {
|
|
|
match reply.result {
|
|
|
Ok(_) => {
|
|
|
@@ -119,22 +117,18 @@ impl ClerkInner {
|
|
|
}
|
|
|
|
|
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
|
|
|
-
|
|
|
- fn call_rpc<M, A, R>(
|
|
|
- &mut self,
|
|
|
- method: M,
|
|
|
- args: A,
|
|
|
+ pub fn retry_rpc<'a, Func, Fut, Args, Reply>(
|
|
|
+ &'a mut self,
|
|
|
+ mut future_func: Func,
|
|
|
+ args: Args,
|
|
|
max_retry: Option<usize>,
|
|
|
- ) -> Option<R>
|
|
|
+ ) -> Option<Reply>
|
|
|
where
|
|
|
- M: AsRef<str>,
|
|
|
- A: Serialize,
|
|
|
- R: DeserializeOwned + ValidReply,
|
|
|
+ Args: Clone,
|
|
|
+ Reply: ValidReply,
|
|
|
+ Fut: Future<Output = std::io::Result<Reply>> + Send + 'a,
|
|
|
+ Func: FnMut(&'a dyn RemoteKvraft, Args) -> Fut,
|
|
|
{
|
|
|
- let method = method.as_ref().to_owned();
|
|
|
- let data = bincode::serialize(&args)
|
|
|
- .expect("Serialization of requests should not fail");
|
|
|
-
|
|
|
let max_retry =
|
|
|
std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
|
|
|
|
|
|
@@ -144,7 +138,7 @@ impl ClerkInner {
|
|
|
let rpc_response = self.executor.block_on(async {
|
|
|
tokio::time::timeout(
|
|
|
Self::DEFAULT_TIMEOUT,
|
|
|
- client.call_rpc(method.clone(), data.clone()),
|
|
|
+ future_func(client.as_ref(), args.clone()),
|
|
|
)
|
|
|
.await
|
|
|
});
|
|
|
@@ -152,9 +146,7 @@ impl ClerkInner {
|
|
|
Ok(reply) => reply,
|
|
|
Err(e) => Err(e.into()),
|
|
|
};
|
|
|
- if let Ok(reply) = reply {
|
|
|
- let ret: R = bincode::deserialize(reply.as_ref())
|
|
|
- .expect("Deserialization of reply should not fail");
|
|
|
+ if let Ok(ret) = reply {
|
|
|
if ret.is_reply_valid() {
|
|
|
self.last_server_index.store(index, Ordering::Relaxed);
|
|
|
return Some(ret);
|
|
|
@@ -185,7 +177,11 @@ impl ClerkInner {
|
|
|
op: GetEnum::AllowDuplicate,
|
|
|
unique_id: self.unique_id.inc(),
|
|
|
};
|
|
|
- let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
|
|
|
+ let reply: GetReply = self.retry_rpc(
|
|
|
+ |remote, args| remote.get(args),
|
|
|
+ args,
|
|
|
+ options.max_retry,
|
|
|
+ )?;
|
|
|
match reply.result {
|
|
|
Ok(val) => Some(val),
|
|
|
Err(KVError::Conflict) => panic!("We should never see a conflict."),
|
|
|
@@ -215,8 +211,11 @@ impl ClerkInner {
|
|
|
op,
|
|
|
unique_id: self.unique_id.inc(),
|
|
|
};
|
|
|
- let reply: PutAppendReply =
|
|
|
- self.call_rpc(PUT_APPEND, args, options.max_retry)?;
|
|
|
+ let reply: PutAppendReply = self.retry_rpc(
|
|
|
+ |remote, args| remote.put_append(args),
|
|
|
+ args,
|
|
|
+ options.max_retry,
|
|
|
+ )?;
|
|
|
match reply.result {
|
|
|
Ok(val) => Some(val),
|
|
|
Err(KVError::Expired) => Some(()),
|