소스 검색

Add RPC deadline for all RPCs.

Jing Yang 5 년 전
부모
커밋
af140e1c46
2개의 변경된 파일19개의 추가작업 그리고 23개의 파일을 삭제
  1. 12 21
      src/lib.rs
  2. 7 2
      src/utils.rs

+ 12 - 21
src/lib.rs

@@ -460,10 +460,11 @@ impl Raft {
         args: RequestVoteArgs,
     ) -> Option<bool> {
         let term = args.term;
-        let reply = retry_rpc(Self::REQUEST_VOTE_RETRY, move |_round| {
-            rpc_client.clone().call_request_vote(args.clone())
-        })
-        .await;
+        let reply =
+            retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
+                rpc_client.clone().call_request_vote(args.clone())
+            })
+            .await;
         if let Ok(reply) = reply {
             return Some(reply.vote_granted && reply.term == term);
         }
@@ -586,7 +587,7 @@ impl Raft {
         rpc_client: RpcClient,
         args: AppendEntriesArgs,
     ) -> std::io::Result<()> {
-        retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
+        retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
             rpc_client.clone().call_append_entries(args.clone())
         })
         .await?;
@@ -654,19 +655,7 @@ impl Raft {
         };
         let term = args.term;
         let match_index = args.prev_log_index + args.entries.len();
-        let result = tokio::time::timeout(
-            RPC_DEADLINE,
-            Self::append_entries(rpc_client, args),
-        )
-        .await;
-
-        let succeeded = match result {
-            Ok(succeeded) => succeeded,
-            Err(_) => {
-                let _ = rerun.send(Some(Peer(peer_index)));
-                return;
-            }
-        };
+        let succeeded = Self::append_entries(rpc_client, args).await;
         match succeeded {
             Ok(Some(succeeded)) => {
                 if succeeded {
@@ -750,9 +739,11 @@ impl Raft {
         args: AppendEntriesArgs,
     ) -> std::io::Result<Option<bool>> {
         let term = args.term;
-        let reply = retry_rpc(Self::APPEND_ENTRIES_RETRY, move |_round| {
-            rpc_client.clone().call_append_entries(args.clone())
-        })
+        let reply = retry_rpc(
+            Self::APPEND_ENTRIES_RETRY,
+            RPC_DEADLINE,
+            move |_round| rpc_client.clone().call_append_entries(args.clone()),
+        )
         .await?;
         Ok(if reply.term == term {
             Some(reply.success)

+ 7 - 2
src/utils.rs

@@ -3,6 +3,7 @@ use std::time::Duration;
 
 pub async fn retry_rpc<Func, Fut, T>(
     max_retry: usize,
+    deadline: Duration,
     mut task_gen: Func,
 ) -> std::io::Result<T>
 where
@@ -13,8 +14,12 @@ where
         if i != 0 {
             tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
         }
-        if let Ok(reply) = task_gen(i).await {
-            return Ok(reply);
+        // Not timed-out.
+        if let Ok(reply) = tokio::time::timeout(deadline, task_gen(i)).await {
+            // And no error
+            if let Ok(reply) = reply {
+                return Ok(reply);
+            }
         }
     }
     Err(std::io::Error::new(