Преглед изворни кода

Wait for the RPC instead of throwing it away.

Jing Yang пре 5 година
родитељ
комит
51c4c2a7ce
1 измењених фајлова са 36 додато и 26 уклоњено
  1. 36 26
      src/lib.rs

+ 36 - 26
src/lib.rs

@@ -102,7 +102,7 @@ struct Raft {
     // applyCh: Sender<ApplyMsg>
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 struct RequestVoteArgs {
     term: Term,
     candidate_id: Peer,
@@ -110,13 +110,13 @@ struct RequestVoteArgs {
     last_log_term: Term,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 struct RequestVoteReply {
     term: Term,
     vote_granted: bool,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 struct AppendEntriesArgs {
     term: Term,
     leader_id: Peer,
@@ -126,7 +126,7 @@ struct AppendEntriesArgs {
     leader_commit: usize,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 struct AppendEntriesReply {
     term: Term,
     success: bool,
@@ -416,6 +416,7 @@ impl Raft {
                         tokio::spawn(Self::send_heartbeat(
                             rf.clone(),
                             rpc_client.clone(),
+                            peer_index,
                         ));
                     }
                 });
@@ -424,34 +425,43 @@ impl Raft {
     }
 
     const HEARTBEAT_RETRY: usize = 3;
-    async fn send_heartbeat(rf: Arc<Mutex<RaftState>>, rpc_client: RpcClient) {
-        let rf = rf.lock();
-        // copy states.
-        let term = rf.current_term;
-        let is_leader = rf.state == State::Leader;
-        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
-        let commit_index = rf.commit_index;
-        let leader_id = rf.leader_id;
-        let next_index = rf.next_index[leader_id.0];
-
-        // Now we can drop the lock.
-        drop(rf);
+    async fn send_heartbeat(
+        rf: Arc<Mutex<RaftState>>,
+        rpc_client: RpcClient,
+        peer_index: usize,
+    ) -> std::io::Result<()> {
+        let (is_leader, next_index, last_log_index, args) = {
+            // Making sure locked rf is out of scope for the following await
+            let rf = rf.lock();
+            // copy states.
+            let term = rf.current_term;
+            let is_leader = rf.state == State::Leader;
+            let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+            let commit_index = rf.commit_index;
+            let leader_id = rf.leader_id;
+            let next_index = rf.next_index[leader_id.0];
+
+            let args = AppendEntriesArgs {
+                term,
+                leader_id,
+                prev_log_index: last_log_index,
+                prev_log_term: last_log_term,
+                entries: vec![],
+                leader_commit: commit_index,
+            };
+            (is_leader, next_index, last_log_index, args)
+        };
 
         if is_leader {
             if next_index <= last_log_index {
                 // TODO: sync log entry instead.
             }
-            Self::retry_rpc(Self::HEARTBEAT_RETRY, |_round| {
-                rpc_client.clone().call_append_entries(AppendEntriesArgs {
-                    term,
-                    leader_id,
-                    prev_log_index: last_log_index,
-                    prev_log_term: last_log_term,
-                    entries: vec![],
-                    leader_commit: commit_index,
-                })
-            });
+            Self::retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
+                rpc_client.clone().call_append_entries(args.clone())
+            })
+            .await?;
         }
+        Ok(())
     }
 }