Przeglądaj źródła

Schedule heartbeats.

Jing Yang 5 lat temu
rodzic
commit
473e626cdc
1 zmienionych plików z 60 dodań i 2 usunięć
  1. 60 2
      src/lib.rs

+ 60 - 2
src/lib.rs

@@ -167,8 +167,7 @@ impl Raft {
         }
 
         let voted_for = rf.voted_for;
-        let last_log_index = rf.log.len() - 1;
-        let last_log_term = rf.log.last().unwrap().term;
+        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
         if (voted_for.is_none() || voted_for == Some(args.candidate_id))
             && (args.last_log_term > last_log_term
                 || (args.last_log_term == last_log_term
@@ -401,6 +400,59 @@ impl Raft {
         rf.election_timer.take();
         rf.persist();
     }
+
+    fn schedule_heartbeats(&self, interval: Duration) {
+        for (peer_index, rpc_client) in self.peers.iter().enumerate() {
+            if peer_index != self.me.0 {
+                // Interval and rf are now owned by the outer async function.
+                let mut interval = tokio::time::interval(interval);
+                let rf = self.inner_state.clone();
+                // RPC client must be cloned into the outer async function.
+                let rpc_client = rpc_client.clone();
+                tokio::spawn(async move {
+                    loop {
+                        // TODO: shutdown signal or cancel token.
+                        interval.tick().await;
+                        tokio::spawn(Self::send_heartbeat(
+                            rf.clone(),
+                            rpc_client.clone(),
+                        ));
+                    }
+                });
+            }
+        }
+    }
+
+    const HEARTBEAT_RETRY: usize = 3;
+    async fn send_heartbeat(rf: Arc<Mutex<RaftState>>, rpc_client: RpcClient) {
+        let rf = rf.lock();
+        // copy states.
+        let term = rf.current_term;
+        let is_leader = rf.state == State::Leader;
+        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let commit_index = rf.commit_index;
+        let leader_id = rf.leader_id;
+        let next_index = rf.next_index[leader_id.0];
+
+        // Now we can drop the lock.
+        drop(rf);
+
+        if is_leader {
+            if next_index <= last_log_index {
+                // TODO: sync log entry instead.
+            }
+            Self::retry_rpc(Self::HEARTBEAT_RETRY, |_round| {
+                rpc_client.clone().call_append_entries(AppendEntriesArgs {
+                    term,
+                    leader_id,
+                    prev_log_index: last_log_index,
+                    prev_log_term: last_log_term,
+                    entries: vec![],
+                    leader_commit: commit_index,
+                })
+            });
+        }
+    }
 }
 
 const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
@@ -432,4 +484,10 @@ impl RaftState {
     fn persist(&self) {
         // TODO: implement
     }
+
+    fn last_log_index_and_term(&self) -> (usize, Term) {
+        let len = self.log.len();
+        assert!(len > 0, "There should always be at least one entry in log");
+        (len - 1, self.log.last().unwrap().term)
+    }
 }