Prechádzať zdrojové kódy

Refactor: request one vote is a function now.

Jing Yang 5 rokov pred
rodič
commit
e4a24f7c0c
1 zmenil súbory, kde vykonal 49 pridanie a 37 odobranie
  1. 49 37
      src/lib.rs

+ 49 - 37
src/lib.rs

@@ -249,26 +249,6 @@ impl Raft {
         }
     }
 
-    async fn retry_rpc<Func, Fut, T>(
-        max_retry: usize,
-        mut task_gen: Func,
-    ) -> std::io::Result<T>
-    where
-        Fut: Future<Output = std::io::Result<T>> + Send + 'static,
-        Func: FnMut(usize) -> Fut,
-    {
-        for i in 0..max_retry {
-            if let Ok(reply) = task_gen(i).await {
-                return Ok(reply);
-            }
-            tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
-        }
-        Err(std::io::Error::new(
-            std::io::ErrorKind::TimedOut,
-            format!("Timed out after {} retries", max_retry),
-        ))
-    }
-
     fn run_election(&self) {
         let (term, last_log_index, last_log_term, cancel_token) = {
             let mut rf = self.inner_state.lock();
@@ -298,24 +278,14 @@ impl Raft {
         let mut votes = vec![];
         for i in 0..self.peers.len() {
             if i != self.me.0 {
-                // Make a clone now so that self will not be passed across await
-                // boundary.
-                let rpc_client = self.peers[i].clone();
                 // RPCs are started right away.
-                let one_vote = tokio::spawn(async move {
-                    let reply_future = Self::retry_rpc(4, move |_round| {
-                        rpc_client.clone().call_request_vote(RequestVoteArgs {
-                            term,
-                            candidate_id: me,
-                            last_log_index,
-                            last_log_term,
-                        })
-                    });
-                    if let Ok(reply) = reply_future.await {
-                        return Some(reply.vote_granted && reply.term == term);
-                    }
-                    return None;
-                });
+                let one_vote = tokio::spawn(Self::request_one_vote(
+                    self.peers[i].clone(),
+                    term,
+                    me,
+                    last_log_index,
+                    last_log_term,
+                ));
                 // Futures must be pinned so that they have Unpin, as required
                 // by futures::future::select.
                 votes.push(one_vote);
@@ -331,6 +301,48 @@ impl Raft {
         ));
     }
 
+    async fn retry_rpc<Func, Fut, T>(
+        max_retry: usize,
+        mut task_gen: Func,
+    ) -> std::io::Result<T>
+    where
+        Fut: Future<Output = std::io::Result<T>> + Send + 'static,
+        Func: FnMut(usize) -> Fut,
+    {
+        for i in 0..max_retry {
+            if let Ok(reply) = task_gen(i).await {
+                return Ok(reply);
+            }
+            tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
+        }
+        Err(std::io::Error::new(
+            std::io::ErrorKind::TimedOut,
+            format!("Timed out after {} retries", max_retry),
+        ))
+    }
+
+    async fn request_one_vote(
+        rpc_client: RpcClient,
+        term: Term,
+        me: Peer,
+        last_log_index: usize,
+        last_log_term: Term,
+    ) -> Option<bool> {
+        let reply = Self::retry_rpc(4, move |_round| {
+            rpc_client.clone().call_request_vote(RequestVoteArgs {
+                term,
+                candidate_id: me,
+                last_log_index,
+                last_log_term,
+            })
+        })
+        .await;
+        if let Ok(reply) = reply {
+            return Some(reply.vote_granted && reply.term == term);
+        }
+        return None;
+    }
+
     async fn count_vote_util_cancelled(
         term: Term,
         rf: Arc<Mutex<RaftState>>,