Jelajahi Sumber

Separate RPC sending and args building.

The args buiding part can and should usually be done synchronously.
This way a lot of reference clones can be saved.
Jing Yang 5 tahun lalu
induk
melakukan
538ec51c91
1 mengubah file dengan 38 tambahan dan 32 penghapusan
  1. 38 32
      src/lib.rs

+ 38 - 32
src/lib.rs

@@ -391,48 +391,54 @@ impl Raft {
                     loop {
                         // TODO: shutdown signal or cancel token.
                         interval.tick().await;
-                        tokio::spawn(Self::send_heartbeat(
-                            rf.clone(),
-                            rpc_client.clone(),
-                        ));
+                        if let Some(args) = Self::build_heartbeat(&rf) {
+                            tokio::spawn(Self::send_heartbeat(
+                                rpc_client.clone(),
+                                args,
+                            ));
+                        }
                     }
                 });
             }
         }
     }
 
+    fn build_heartbeat(
+        rf: &Arc<Mutex<RaftState>>,
+    ) -> Option<AppendEntriesArgs> {
+        let rf = rf.lock();
+
+        // copy states.
+        let term = rf.current_term;
+        let is_leader = rf.state == State::Leader;
+        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let commit_index = rf.commit_index;
+        let leader_id = rf.leader_id;
+
+        if !is_leader {
+            return None;
+        }
+
+        let args = AppendEntriesArgs {
+            term,
+            leader_id,
+            prev_log_index: last_log_index,
+            prev_log_term: last_log_term,
+            entries: vec![],
+            leader_commit: commit_index,
+        };
+        Some(args)
+    }
+
     const HEARTBEAT_RETRY: usize = 3;
     async fn send_heartbeat(
-        rf: Arc<Mutex<RaftState>>,
         rpc_client: RpcClient,
+        args: AppendEntriesArgs,
     ) -> std::io::Result<()> {
-        let (is_leader, args) = {
-            // Making sure locked rf is out of scope for the following await
-            let rf = rf.lock();
-            // copy states.
-            let term = rf.current_term;
-            let is_leader = rf.state == State::Leader;
-            let (last_log_index, last_log_term) = rf.last_log_index_and_term();
-            let commit_index = rf.commit_index;
-            let leader_id = rf.leader_id;
-
-            let args = AppendEntriesArgs {
-                term,
-                leader_id,
-                prev_log_index: last_log_index,
-                prev_log_term: last_log_term,
-                entries: vec![],
-                leader_commit: commit_index,
-            };
-            (is_leader, args)
-        };
-
-        if is_leader {
-            retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
-                rpc_client.clone().call_append_entries(args.clone())
-            })
-            .await?;
-        }
+        retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
+            rpc_client.clone().call_append_entries(args.clone())
+        })
+        .await?;
         Ok(())
     }