Parcourir la source

Continue moving args building out of RPC sending.

Jing Yang il y a 5 ans
Parent
commit
fa6abe32a4
1 fichiers modifiés avec 30 ajouts et 35 suppressions
  1. 30 35
      src/lib.rs

+ 30 - 35
src/lib.rs

@@ -289,7 +289,6 @@ impl Raft {
                 // RPCs are started right away.
                 let one_vote = tokio::spawn(Self::request_one_vote(
                     rpc_client,
-                    term,
                     args.clone(),
                 ));
                 // Futures must be pinned so that they have Unpin, as required
@@ -310,9 +309,9 @@ impl Raft {
     const REQUEST_VOTE_RETRY: usize = 4;
     async fn request_one_vote(
         rpc_client: RpcClient,
-        term: Term,
         args: RequestVoteArgs,
     ) -> Option<bool> {
+        let term = args.term;
         let reply = retry_rpc(Self::REQUEST_VOTE_RETRY, move |_round| {
             rpc_client.clone().call_request_vote(args.clone())
         })
@@ -442,7 +441,6 @@ impl Raft {
         Ok(())
     }
 
-    const APPEND_ENTRIES_RETRY: usize = 3;
     fn run_log_entry_daemon(
         &self,
     ) -> (
@@ -466,22 +464,14 @@ impl Raft {
                         let peer_index = i;
                         tokio::spawn(async move {
                             // TODO: cancel in flight changes?
-                            let rf_clone = rf.clone();
-                            let succeeded = retry_rpc(
-                                Self::APPEND_ENTRIES_RETRY,
-                                move |_round| {
-                                    Self::append_entries(
-                                        rf.clone(),
-                                        rpc_client.clone(),
-                                        peer_index,
-                                    )
-                                },
-                            )
-                            .await;
+                            let args =
+                                Self::build_append_entries(&rf, peer_index);
+                            let succeeded =
+                                Self::append_entries(rpc_client, args).await;
                             match succeeded {
                                 Ok(done) => {
                                     if !done {
-                                        let mut rf = rf_clone.lock();
+                                        let mut rf = rf.lock();
 
                                         let step =
                                             &mut rf.current_step[peer_index];
@@ -518,28 +508,33 @@ impl Raft {
         (handle, tx)
     }
 
+    fn build_append_entries(
+        rf: &Arc<Mutex<RaftState>>,
+        peer_index: usize,
+    ) -> AppendEntriesArgs {
+        let rf = rf.lock();
+        let (prev_log_index, prev_log_term) = rf.last_log_index_and_term();
+        AppendEntriesArgs {
+            term: rf.current_term,
+            leader_id: rf.leader_id,
+            prev_log_index,
+            prev_log_term,
+            entries: rf.log[rf.next_index[peer_index]..].to_vec(),
+            leader_commit: rf.commit_index,
+        }
+    }
+
+    const APPEND_ENTRIES_RETRY: usize = 3;
     async fn append_entries(
-        rf: Arc<Mutex<RaftState>>,
         rpc_client: RpcClient,
-        peer_index: usize,
+        args: AppendEntriesArgs,
     ) -> std::io::Result<bool> {
-        let (term, result) = {
-            let rf = rf.lock();
-            let term = rf.current_term;
-            let (prev_log_index, prev_log_term) = rf.last_log_index_and_term();
-            let result = rpc_client.call_append_entries(AppendEntriesArgs {
-                term: rf.current_term,
-                leader_id: rf.leader_id,
-                prev_log_index,
-                prev_log_term,
-                entries: rf.log[rf.next_index[peer_index]..].to_vec(),
-                leader_commit: rf.commit_index,
-            });
-            (term, result)
-        };
-        let reply = result.await?;
-        let ret = reply.term != term || reply.success;
-        Ok(ret)
+        let term = args.term;
+        let reply = retry_rpc(Self::APPEND_ENTRIES_RETRY, move |_round| {
+            rpc_client.clone().call_append_entries(args.clone())
+        })
+        .await?;
+        Ok(reply.term != term || reply.success)
     }
 }