Sfoglia il codice sorgente

Move retry_rpc to utils.

Jing Yang 5 anni fa
parent
commit
f3b1c4b8e4
2 ha cambiato i file con 27 aggiunte e 23 eliminazioni
  1. 5 23
      src/lib.rs
  2. 22 0
      src/utils.rs

+ 5 - 23
src/lib.rs

@@ -17,8 +17,10 @@ use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
 use crate::rpcs::RpcClient;
+use crate::utils::retry_rpc;
 
 pub mod rpcs;
+mod utils;
 
 #[derive(Eq, PartialEq)]
 enum State {
@@ -301,26 +303,6 @@ impl Raft {
         ));
     }
 
-    async fn retry_rpc<Func, Fut, T>(
-        max_retry: usize,
-        mut task_gen: Func,
-    ) -> std::io::Result<T>
-    where
-        Fut: Future<Output = std::io::Result<T>> + Send + 'static,
-        Func: FnMut(usize) -> Fut,
-    {
-        for i in 0..max_retry {
-            if let Ok(reply) = task_gen(i).await {
-                return Ok(reply);
-            }
-            tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
-        }
-        Err(std::io::Error::new(
-            std::io::ErrorKind::TimedOut,
-            format!("Timed out after {} retries", max_retry),
-        ))
-    }
-
     async fn request_one_vote(
         rpc_client: RpcClient,
         term: Term,
@@ -328,7 +310,7 @@ impl Raft {
         last_log_index: usize,
         last_log_term: Term,
     ) -> Option<bool> {
-        let reply = Self::retry_rpc(4, move |_round| {
+        let reply = retry_rpc(4, move |_round| {
             rpc_client.clone().call_request_vote(RequestVoteArgs {
                 term,
                 candidate_id: me,
@@ -448,7 +430,7 @@ impl Raft {
         };
 
         if is_leader {
-            Self::retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
+            retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
                 rpc_client.clone().call_append_entries(args.clone())
             })
             .await?;
@@ -481,7 +463,7 @@ impl Raft {
                         tokio::spawn(async move {
                             // TODO: cancel in flight changes?
                             let rf_clone = rf.clone();
-                            let succeeded = Self::retry_rpc(
+                            let succeeded = retry_rpc(
                                 Self::APPEND_ENTRIES_RETRY,
                                 move |_round| {
                                     Self::append_entries(

+ 22 - 0
src/utils.rs

@@ -0,0 +1,22 @@
+use std::future::Future;
+use std::time::Duration;
+
+pub async fn retry_rpc<Func, Fut, T>(
+    max_retry: usize,
+    mut task_gen: Func,
+) -> std::io::Result<T>
+where
+    Fut: Future<Output = std::io::Result<T>> + Send + 'static,
+    Func: FnMut(usize) -> Fut,
+{
+    for i in 0..max_retry {
+        if let Ok(reply) = task_gen(i).await {
+            return Ok(reply);
+        }
+        tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
+    }
+    Err(std::io::Error::new(
+        std::io::ErrorKind::TimedOut,
+        format!("Timed out after {} retries", max_retry),
+    ))
+}