ソースを参照

Reorganize install snapshot to sync_log_entry.rs.

Removed 'pub' modifiers. Renamed send_install_snapshot() to
install_snapshot() to be consistent with other RPCs.'

Move the RPC sender to sync_log_entry where its used. Move the RPC
messgae definitions to lib.rs.
Jing Yang 4 年 前
コミット
9d1a1e129d
4 ファイル変更61 行追加71 行削除
  1. 19 1
      src/lib.rs
  2. 1 62
      src/process_install_snapshot.rs
  3. 2 3
      src/rpcs.rs
  4. 39 5
      src/sync_log_entry.rs

+ 19 - 1
src/lib.rs

@@ -33,10 +33,10 @@ mod daemon_env;
 mod election;
 mod heartbeats;
 mod index_term;
-mod install_snapshot;
 mod log_array;
 mod persister;
 mod process_append_entries;
+mod process_install_snapshot;
 mod process_request_vote;
 mod raft_state;
 pub mod rpcs;
@@ -112,6 +112,24 @@ struct AppendEntriesReply {
     committed: Option<IndexTerm>,
 }
 
+#[derive(Clone, Debug, Serialize, Deserialize)]
+struct InstallSnapshotArgs {
+    pub(crate) term: Term,
+    leader_id: Peer,
+    pub(crate) last_included_index: Index,
+    last_included_term: Term,
+    // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
+    data: Vec<u8>,
+    offset: usize,
+    done: bool,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+struct InstallSnapshotReply {
+    term: Term,
+    committed: Option<IndexTerm>,
+}
+
 // Commands must be
 // 0. 'static: they have to live long enough for thread pools.
 // 1. clone: they are put in vectors and request messages.

+ 1 - 62
src/install_snapshot.rs → src/process_install_snapshot.rs

@@ -1,29 +1,6 @@
 use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
-use crate::index_term::IndexTerm;
-use crate::sync_log_entry::SyncLogEntryResult;
-use crate::utils::retry_rpc;
-use crate::{
-    Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
-};
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub(crate) struct InstallSnapshotArgs {
-    pub(crate) term: Term,
-    leader_id: Peer,
-    pub(crate) last_included_index: Index,
-    last_included_term: Term,
-    // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
-    data: Vec<u8>,
-    offset: usize,
-    done: bool,
-}
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub(crate) struct InstallSnapshotReply {
-    term: Term,
-    committed: Option<IndexTerm>,
-}
+use crate::{InstallSnapshotArgs, InstallSnapshotReply, Raft, State};
 
 impl<C: Clone + Default + serde::Serialize> Raft<C> {
     pub(crate) fn process_install_snapshot(
@@ -114,42 +91,4 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
             committed: None,
         }
     }
-
-    pub(crate) fn build_install_snapshot(
-        rf: &RaftState<C>,
-    ) -> InstallSnapshotArgs {
-        let (last, snapshot) = rf.log.snapshot();
-        InstallSnapshotArgs {
-            term: rf.current_term,
-            leader_id: rf.leader_id,
-            last_included_index: last.index,
-            last_included_term: last.term,
-            data: snapshot.to_owned(),
-            offset: 0,
-            done: true,
-        }
-    }
-
-    const INSTALL_SNAPSHOT_RETRY: usize = 1;
-    pub(crate) async fn send_install_snapshot(
-        rpc_client: &RpcClient,
-        args: InstallSnapshotArgs,
-    ) -> std::io::Result<SyncLogEntryResult> {
-        let term = args.term;
-        let reply = retry_rpc(
-            Self::INSTALL_SNAPSHOT_RETRY,
-            RPC_DEADLINE,
-            move |_round| rpc_client.call_install_snapshot(args.clone()),
-        )
-        .await?;
-        Ok(if reply.term == term {
-            if let Some(committed) = reply.committed {
-                SyncLogEntryResult::Archived(committed)
-            } else {
-                SyncLogEntryResult::Success
-            }
-        } else {
-            SyncLogEntryResult::TermElapsed(reply.term)
-        })
-    }
 }

+ 2 - 3
src/rpcs.rs

@@ -1,10 +1,9 @@
 use labrpc::{Client, Network, ReplyMessage, RequestMessage, Server};
 use parking_lot::Mutex;
 
-use crate::install_snapshot::{InstallSnapshotArgs, InstallSnapshotReply};
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, Raft, RequestVoteArgs,
-    RequestVoteReply,
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, Raft, RequestVoteArgs, RequestVoteReply,
 };
 use serde::de::DeserializeOwned;
 use serde::Serialize;

+ 39 - 5
src/sync_log_entry.rs

@@ -5,11 +5,10 @@ use std::time::Duration;
 use parking_lot::{Condvar, Mutex};
 
 use crate::index_term::IndexTerm;
-use crate::install_snapshot::InstallSnapshotArgs;
 use crate::utils::retry_rpc;
 use crate::{
-    AppendEntriesArgs, Peer, Raft, RaftState, RpcClient, Term,
-    HEARTBEAT_INTERVAL_MILLIS, RPC_DEADLINE,
+    AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RpcClient,
+    Term, HEARTBEAT_INTERVAL_MILLIS, RPC_DEADLINE,
 };
 
 #[repr(align(64))]
@@ -114,8 +113,7 @@ where
                 let term = args.term;
                 let prev_log_index = args.last_included_index;
                 let match_index = args.last_included_index;
-                let succeeded =
-                    Self::send_install_snapshot(&rpc_client, args).await;
+                let succeeded = Self::install_snapshot(&rpc_client, args).await;
 
                 (term, prev_log_index, match_index, succeeded)
             }
@@ -291,4 +289,40 @@ where
             SyncLogEntryResult::TermElapsed(reply.term)
         })
     }
+
+    fn build_install_snapshot(rf: &RaftState<Command>) -> InstallSnapshotArgs {
+        let (last, snapshot) = rf.log.snapshot();
+        InstallSnapshotArgs {
+            term: rf.current_term,
+            leader_id: rf.leader_id,
+            last_included_index: last.index,
+            last_included_term: last.term,
+            data: snapshot.to_owned(),
+            offset: 0,
+            done: true,
+        }
+    }
+
+    const INSTALL_SNAPSHOT_RETRY: usize = 1;
+    async fn install_snapshot(
+        rpc_client: &RpcClient,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<SyncLogEntryResult> {
+        let term = args.term;
+        let reply = retry_rpc(
+            Self::INSTALL_SNAPSHOT_RETRY,
+            RPC_DEADLINE,
+            move |_round| rpc_client.call_install_snapshot(args.clone()),
+        )
+        .await?;
+        Ok(if reply.term == term {
+            if let Some(committed) = reply.committed {
+                SyncLogEntryResult::Archived(committed)
+            } else {
+                SyncLogEntryResult::Success
+            }
+        } else {
+            SyncLogEntryResult::TermElapsed(reply.term)
+        })
+    }
 }