瀏覽代碼

Add send_install_snapshot and RPC wrappers.

Jing Yang 5 年之前
父節點
當前提交
c9a4364ae7
共有 2 個文件被更改,包括 65 次插入2 次删除
  1. 22 1
      src/install_snapshot.rs
  2. 43 1
      src/rpcs.rs

+ 22 - 1
src/install_snapshot.rs

@@ -1,4 +1,7 @@
-use crate::{Index, Peer, Raft, RaftState, State, Term};
+use crate::utils::retry_rpc;
+use crate::{
+    Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
+};
 use parking_lot::Mutex;
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -13,6 +16,7 @@ pub(crate) struct InstallSnapshotArgs {
     done: bool,
 }
 
+#[derive(Clone, Debug, Serialize, Deserialize)]
 pub(crate) struct InstallSnapshotReply {
     term: Term,
 }
@@ -68,6 +72,7 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         InstallSnapshotReply { term: args.term }
     }
 
+    #[allow(unused)]
     fn build_install_snapshot(
         rf: &Mutex<RaftState<C>>,
     ) -> Option<InstallSnapshotArgs> {
@@ -86,4 +91,20 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
             done: true,
         })
     }
+
+    const INSTALL_SNAPSHOT_RETRY: usize = 1;
+    #[allow(unused)]
+    async fn send_install_snapshot(
+        rpc_client: &RpcClient,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<bool> {
+        let term = args.term;
+        let reply = retry_rpc(
+            Self::INSTALL_SNAPSHOT_RETRY,
+            RPC_DEADLINE,
+            move |_round| rpc_client.call_install_snapshot(args.clone()),
+        )
+        .await?;
+        Ok(reply.term == term)
+    }
 }

+ 43 - 1
src/rpcs.rs

@@ -3,6 +3,7 @@ use std::sync::Arc;
 use labrpc::{Client, Network, ReplyMessage, RequestMessage, Server};
 use parking_lot::Mutex;
 
+use crate::install_snapshot::{InstallSnapshotArgs, InstallSnapshotReply};
 use crate::{
     AppendEntriesArgs, AppendEntriesReply, Raft, RequestVoteArgs,
     RequestVoteReply,
@@ -41,8 +42,23 @@ fn proxy_append_entries<
     )
 }
 
+fn proxy_install_snapshot<Command: Clone + Serialize + Default>(
+    raft: &Raft<Command>,
+    data: RequestMessage,
+) -> ReplyMessage {
+    let reply = raft.process_install_snapshot(
+        bincode::deserialize(data.as_ref())
+            .expect("Deserialization should not fail"),
+    );
+
+    ReplyMessage::from(
+        bincode::serialize(&reply).expect("Serialization should not fail"),
+    )
+}
+
 pub(crate) const REQUEST_VOTE_RPC: &str = "Raft.RequestVote";
 pub(crate) const APPEND_ENTRIES_RPC: &str = "Raft.AppendEntries";
+pub(crate) const INSTALL_SNAPSHOT_RPC: &str = "Raft.InstallSnapshot";
 
 pub struct RpcClient(Client);
 
@@ -81,6 +97,24 @@ impl RpcClient {
         Ok(bincode::deserialize(reply.as_ref())
             .expect("Deserialization of reply should not fail"))
     }
+
+    pub(crate) async fn call_install_snapshot(
+        &self,
+        request: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        let data = RequestMessage::from(
+            bincode::serialize(&request)
+                .expect("Serialization of requests should not fail"),
+        );
+
+        let reply = self
+            .0
+            .call_rpc(INSTALL_SNAPSHOT_RPC.to_owned(), data)
+            .await?;
+
+        Ok(bincode::deserialize(reply.as_ref())
+            .expect("Deserialization of reply should not fail"))
+    }
 }
 
 pub fn register_server<
@@ -103,7 +137,7 @@ pub fn register_server<
         }),
     )?;
 
-    let raft_clone = raft;
+    let raft_clone = raft.clone();
     server.register_rpc_handler(
         APPEND_ENTRIES_RPC.to_owned(),
         Box::new(move |request| {
@@ -111,6 +145,14 @@ pub fn register_server<
         }),
     )?;
 
+    let raft_clone = raft;
+    server.register_rpc_handler(
+        INSTALL_SNAPSHOT_RPC.to_owned(),
+        Box::new(move |request| {
+            proxy_install_snapshot(raft_clone.as_ref(), request)
+        }),
+    )?;
+
     network.add_server(server_name, server);
 
     Ok(())