Pārlūkot izejas kodu

Set the deadline to forever in tarpc requests.

None of the Raft RPCs would block. KVServer RPCs might block but the
clients are web servers, so we can tolerate the risk.

The servers might have a large clock skew, and thus could not handle
absolute deadlines (system time + 10 seconds) very well. The servers
with the largest system time would keep returning 'DeadlineExceeded'
errors to other servers.
Jing Yang 3 gadi atpakaļ
vecāks
revīzija
126042c0f8
3 mainītis faili ar 18 papildinājumiem un 6 dzēšanām
  1. 3 3
      durio/src/kv_service.rs
  2. 3 3
      durio/src/raft_service.rs
  3. 12 0
      durio/src/utils.rs

+ 3 - 3
durio/src/kv_service.rs

@@ -46,7 +46,7 @@ impl KVService for KVRpcServer {
 #[async_trait]
 impl RemoteKvraft for KVServiceClient {
     async fn get(&self, args: GetArgs) -> std::io::Result<GetReply> {
-        self.get(Context::current(), args)
+        self.get(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -55,7 +55,7 @@ impl RemoteKvraft for KVServiceClient {
         &self,
         args: PutAppendArgs,
     ) -> std::io::Result<PutAppendReply> {
-        self.put_append(Context::current(), args)
+        self.put_append(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -64,7 +64,7 @@ impl RemoteKvraft for KVServiceClient {
         &self,
         args: CommitSentinelArgs,
     ) -> std::io::Result<CommitSentinelReply> {
-        self.commit_sentinel(Context::current(), args)
+        self.commit_sentinel(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }

+ 3 - 3
durio/src/raft_service.rs

@@ -82,7 +82,7 @@ impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
     ) -> std::io::Result<RequestVoteReply> {
         self.get_or_try_init()
             .await?
-            .request_vote(Context::current(), args)
+            .request_vote(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -93,7 +93,7 @@ impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
     ) -> std::io::Result<AppendEntriesReply> {
         self.get_or_try_init()
             .await?
-            .append_entries(Context::current(), args)
+            .append_entries(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -104,7 +104,7 @@ impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
     ) -> std::io::Result<InstallSnapshotReply> {
         self.get_or_try_init()
             .await?
-            .install_snapshot(Context::current(), args)
+            .install_snapshot(crate::utils::context(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }

+ 12 - 0
durio/src/utils.rs

@@ -5,6 +5,18 @@ use futures_util::StreamExt;
 use tarpc::client::RpcError;
 use tarpc::server::{Channel, Serve};
 
+pub(crate) fn deadline_forever() -> std::time::SystemTime {
+    std::time::SystemTime::now()
+        // This is the maximum deadline allowed by tarpc / tokio_util.
+        + std::time::Duration::from_secs(2 * 365 * 24 * 60 * 60)
+}
+
+pub(crate) fn context() -> tarpc::context::Context {
+    let mut context = tarpc::context::Context::current();
+    context.deadline = deadline_forever();
+    context
+}
+
 pub(crate) fn translate_rpc_error(e: RpcError) -> std::io::Error {
     match e {
         RpcError::Disconnected => std::io::Error::new(ErrorKind::BrokenPipe, e),