Kaynağa Gözat

Implement the logic to send install_snapshot RPCs.

Jing Yang 5 yıl önce
ebeveyn
işleme
afe8fa2033
2 değiştirilmiş dosya ile 60 ekleme ve 28 silme
  1. 10 17
      src/install_snapshot.rs
  2. 50 11
      src/lib.rs

+ 10 - 17
src/install_snapshot.rs

@@ -2,13 +2,12 @@ use crate::utils::retry_rpc;
 use crate::{
     Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
 };
-use parking_lot::Mutex;
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub(crate) struct InstallSnapshotArgs {
-    term: Term,
+    pub(crate) term: Term,
     leader_id: Peer,
-    last_included_index: Index,
+    pub(crate) last_included_index: Index,
     last_included_term: Term,
     // TODO(ditsing): this seems less efficient.
     data: Vec<u8>,
@@ -72,16 +71,11 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         InstallSnapshotReply { term: args.term }
     }
 
-    #[allow(unused)]
-    fn build_install_snapshot(
-        rf: &Mutex<RaftState<C>>,
-    ) -> Option<InstallSnapshotArgs> {
-        let rf = rf.lock();
-        if !rf.is_leader() {
-            return None;
-        }
+    pub(crate) fn build_install_snapshot(
+        rf: &RaftState<C>,
+    ) -> InstallSnapshotArgs {
         let (last, snapshot) = rf.log.snapshot();
-        Some(InstallSnapshotArgs {
+        InstallSnapshotArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
             last_included_index: last.index,
@@ -89,15 +83,14 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
             data: snapshot.to_owned(),
             offset: 0,
             done: true,
-        })
+        }
     }
 
     const INSTALL_SNAPSHOT_RETRY: usize = 1;
-    #[allow(unused)]
-    async fn send_install_snapshot(
+    pub(crate) async fn send_install_snapshot(
         rpc_client: &RpcClient,
         args: InstallSnapshotArgs,
-    ) -> std::io::Result<bool> {
+    ) -> std::io::Result<Option<bool>> {
         let term = args.term;
         let reply = retry_rpc(
             Self::INSTALL_SNAPSHOT_RETRY,
@@ -105,6 +98,6 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
             move |_round| rpc_client.call_install_snapshot(args.clone()),
         )
         .await?;
-        Ok(reply.term == term)
+        Ok(if reply.term == term { Some(true) } else { None })
     }
 }

+ 50 - 11
src/lib.rs

@@ -16,6 +16,7 @@ use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
+use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
@@ -320,6 +321,12 @@ where
     }
 }
 
+enum SyncLogEntryOperation<Command> {
+    AppendEntries(AppendEntriesArgs<Command>),
+    InstallSnapshot(InstallSnapshotArgs),
+    None,
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -685,13 +692,26 @@ where
             return;
         }
 
-        let args = match Self::build_append_entries(&rf, peer_index) {
-            Some(args) => args,
-            None => return,
+        let operation = Self::build_sync_log_entry(&rf, peer_index);
+        let (term, match_index, succeeded) = match operation {
+            SyncLogEntryOperation::AppendEntries(args) => {
+                let term = args.term;
+                let match_index = args.prev_log_index + args.entries.len();
+                let succeeded = Self::append_entries(&rpc_client, args).await;
+
+                (term, match_index, succeeded)
+            }
+            SyncLogEntryOperation::InstallSnapshot(args) => {
+                let term = args.term;
+                let match_index = args.last_included_index;
+                let succeeded =
+                    Self::send_install_snapshot(&rpc_client, args).await;
+
+                (term, match_index, succeeded)
+            }
+            SyncLogEntryOperation::None => return,
         };
-        let term = args.term;
-        let match_index = args.prev_log_index + args.entries.len();
-        let succeeded = Self::append_entries(&rpc_client, args).await;
+
         match succeeded {
             Ok(Some(true)) => {
                 let mut rf = rf.lock();
@@ -750,24 +770,43 @@ where
         };
     }
 
-    fn build_append_entries(
+    fn build_sync_log_entry(
         rf: &Mutex<RaftState<Command>>,
         peer_index: usize,
-    ) -> Option<AppendEntriesArgs<Command>> {
+    ) -> SyncLogEntryOperation<Command> {
         let rf = rf.lock();
         if !rf.is_leader() {
-            return None;
+            return SyncLogEntryOperation::None;
         }
+
+        // To send AppendEntries request, next_index must be strictly larger
+        // than start(). Otherwise we won't be able to know the log term of the
+        // entry right before next_index.
+        return if rf.next_index[peer_index] > rf.log.start() {
+            SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
+                &rf, peer_index,
+            ))
+        } else {
+            SyncLogEntryOperation::InstallSnapshot(
+                Self::build_install_snapshot(&rf),
+            )
+        };
+    }
+
+    fn build_append_entries(
+        rf: &RaftState<Command>,
+        peer_index: usize,
+    ) -> AppendEntriesArgs<Command> {
         let prev_log_index = rf.next_index[peer_index] - 1;
         let prev_log_term = rf.log[prev_log_index].term;
-        Some(AppendEntriesArgs {
+        AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_term,
             entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
             leader_commit: rf.commit_index,
-        })
+        }
     }
 
     const APPEND_ENTRIES_RETRY: usize = 1;