Преглед изворни кода

Implement the log entry syncing code.

Jing Yang пре 5 година
родитељ
комит
c00c177400
1 измењених фајлова са 102 додато и 8 уклоњено
  1. 102 8
      src/lib.rs

+ 102 - 8
src/lib.rs

@@ -414,7 +414,6 @@ impl Raft {
                         tokio::spawn(Self::send_heartbeat(
                             rf.clone(),
                             rpc_client.clone(),
-                            peer_index,
                         ));
                     }
                 });
@@ -426,9 +425,8 @@ impl Raft {
     async fn send_heartbeat(
         rf: Arc<Mutex<RaftState>>,
         rpc_client: RpcClient,
-        peer_index: usize,
     ) -> std::io::Result<()> {
-        let (is_leader, next_index, last_log_index, args) = {
+        let (is_leader, args) = {
             // Making sure locked rf is out of scope for the following await
             let rf = rf.lock();
             // copy states.
@@ -437,7 +435,6 @@ impl Raft {
             let (last_log_index, last_log_term) = rf.last_log_index_and_term();
             let commit_index = rf.commit_index;
             let leader_id = rf.leader_id;
-            let next_index = rf.next_index[peer_index.0];
 
             let args = AppendEntriesArgs {
                 term,
@@ -447,13 +444,10 @@ impl Raft {
                 entries: vec![],
                 leader_commit: commit_index,
             };
-            (is_leader, next_index, last_log_index, args)
+            (is_leader, args)
         };
 
         if is_leader {
-            if next_index <= last_log_index {
-                // TODO: sync log entry instead.
-            }
             Self::retry_rpc(Self::HEARTBEAT_RETRY, move |_round| {
                 rpc_client.clone().call_append_entries(args.clone())
             })
@@ -461,6 +455,106 @@ impl Raft {
         }
         Ok(())
     }
+
+    const APPEND_ENTRIES_RETRY: usize = 3;
+    fn run_log_entry_daemon(
+        &self,
+    ) -> (
+        std::thread::JoinHandle<()>,
+        std::sync::mpsc::Sender<Option<Peer>>,
+    ) {
+        let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+
+        // Clone everything that the thread needs.
+        let rerun = tx.clone();
+        let peers = self.peers.clone();
+        let rf = self.inner_state.clone();
+        let me = self.me;
+        let handle = std::thread::spawn(move || {
+            while let Ok(peer) = rx.recv() {
+                for (i, rpc_client) in peers.iter().enumerate() {
+                    if i != me.0 && peer.map(|p| p.0 == i).unwrap_or(true) {
+                        let rf = rf.clone();
+                        let rpc_client = rpc_client.clone();
+                        let rerun = rerun.clone();
+                        let peer_index = i;
+                        tokio::spawn(async move {
+                            // TODO: cancel in flight changes?
+                            let rf_clone = rf.clone();
+                            let succeeded = Self::retry_rpc(
+                                Self::APPEND_ENTRIES_RETRY,
+                                move |_round| {
+                                    Self::append_entries(
+                                        rf.clone(),
+                                        rpc_client.clone(),
+                                        peer_index,
+                                    )
+                                },
+                            )
+                            .await;
+                            match succeeded {
+                                Ok(done) => {
+                                    if !done {
+                                        let mut rf = rf_clone.lock();
+
+                                        let step =
+                                            &mut rf.current_step[peer_index];
+                                        *step += 1;
+                                        let diff = (1 << 8) << *step;
+
+                                        let next_index =
+                                            &mut rf.next_index[peer_index];
+                                        if diff >= *next_index {
+                                            *next_index = 1usize;
+                                        } else {
+                                            *next_index -= diff;
+                                        }
+
+                                        rerun.send(Some(Peer(peer_index)));
+                                    }
+                                }
+                                Err(_) => {
+                                    tokio::time::delay_for(
+                                        Duration::from_millis(
+                                            HEARTBEAT_INTERVAL_MILLIS,
+                                        ),
+                                    )
+                                    .await;
+                                    rerun.send(Some(Peer(peer_index)));
+                                }
+                            };
+                        });
+                    }
+                }
+            }
+        });
+
+        (handle, tx)
+    }
+
+    async fn append_entries(
+        rf: Arc<Mutex<RaftState>>,
+        rpc_client: RpcClient,
+        peer_index: usize,
+    ) -> std::io::Result<bool> {
+        let (term, result) = {
+            let rf = rf.lock();
+            let term = rf.current_term;
+            let (prev_log_index, prev_log_term) = rf.last_log_index_and_term();
+            let result = rpc_client.call_append_entries(AppendEntriesArgs {
+                term: rf.current_term,
+                leader_id: rf.leader_id,
+                prev_log_index,
+                prev_log_term,
+                entries: rf.log[rf.next_index[peer_index]..].to_vec(),
+                leader_commit: rf.commit_index,
+            });
+            (term, result)
+        };
+        let reply = result.await?;
+        let ret = reply.term != term || reply.success;
+        Ok(ret)
+    }
 }
 
 const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;