Prechádzať zdrojové kódy

Send RPCs right away in elections.

Jing Yang 5 rokov pred
rodič
commit
54dcf65e2e
1 zmenil súbory, kde vykonal 7 pridanie a 6 odobranie
  1. 7 6
      src/lib.rs

+ 7 - 6
src/lib.rs

@@ -301,7 +301,8 @@ impl Raft {
                 // Make a clone now so that self will not be passed across await
                 // boundary.
                 let rpc_client = self.peers[i].clone();
-                let one_vote = async move {
+                // RPCs are started right away.
+                let one_vote = tokio::spawn(async move {
                     let reply_future = Self::retry_rpc(4, move |_round| {
                         rpc_client.clone().call_request_vote(RequestVoteArgs {
                             term,
@@ -314,10 +315,10 @@ impl Raft {
                         return Some(reply.vote_granted && reply.term == term);
                     }
                     return None;
-                };
+                });
                 // Futures must be pinned so that they have Unpin, as required
                 // by futures::future::select.
-                votes.push(Box::pin(one_vote));
+                votes.push(one_vote);
             }
         }
 
@@ -333,7 +334,7 @@ impl Raft {
     async fn count_vote_util_cancelled(
         term: Term,
         rf: Arc<Mutex<RaftState>>,
-        votes: Vec<impl Future<Output = Option<bool>> + Unpin>,
+        votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         majority: usize,
         cancel_token: futures::channel::oneshot::Receiver<Term>,
     ) {
@@ -342,7 +343,7 @@ impl Raft {
         let mut cancel_token = cancel_token;
         let mut futures_vec = votes;
         while vote_count < majority && against_count <= majority {
-            // Running futures-rs futures on tokio. Fingers crossed.
+            // Mixing tokio futures with futures-rs ones. Fingers crossed.
             let selected = futures::future::select(
                 cancel_token,
                 futures::future::select_all(futures_vec),
@@ -356,7 +357,7 @@ impl Raft {
             futures_vec = rest;
             cancel_token = new_token;
 
-            if let Some(vote) = one_vote {
+            if let Ok(Some(vote)) = one_vote {
                 if vote {
                     vote_count += 1
                 } else {