Procházet zdrojové kódy

Do not connect to the server's own Raft service.

Otherwise we'll have a chiken and egg problem. The KV server
expects all remote Raft servers to be up, before it starts its only
Raft server, whicch is one of the "remote" servers.

Created an optional wrapper around the remote Raft service to achieve
that.
Jing Yang před 4 roky
rodič
revize
d5b0508aa3
3 změnil soubory, kde provedl 33 přidání a 10 odebrání
  1. 3 1
      durio/src/main.rs
  2. 21 6
      durio/src/raft_service.rs
  3. 9 3
      durio/src/run.rs

+ 3 - 1
durio/src/main.rs

@@ -19,7 +19,9 @@ struct PutAppendBody {
 
 #[tokio::main]
 async fn main() {
-    run_kv_instance(([127, 0, 0, 1], 9988).into(), vec![], 0)
+    let kv_addr = ([127, 0, 0, 1], 9988).into();
+    let raft_addr = ([127, 0, 0, 1], 10001).into();
+    run_kv_instance(kv_addr, vec![raft_addr], 0)
         .await
         .expect("Running kv instance should not fail");
 

+ 21 - 6
durio/src/raft_service.rs

@@ -52,13 +52,18 @@ impl RaftService for RaftRpcServer {
     }
 }
 
+pub(crate) struct OptionalRaftServiceClient(Option<RaftServiceClient>);
+
 #[async_trait]
-impl RemoteRaft<UniqueKVOp> for RaftServiceClient {
+impl RemoteRaft<UniqueKVOp> for OptionalRaftServiceClient {
     async fn request_vote(
         &self,
         args: RequestVoteArgs,
     ) -> std::io::Result<RequestVoteReply> {
-        self.request_vote(Context::current(), args)
+        self.0
+            .as_ref()
+            .unwrap()
+            .request_vote(Context::current(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -67,7 +72,10 @@ impl RemoteRaft<UniqueKVOp> for RaftServiceClient {
         &self,
         args: AppendEntriesArgs<UniqueKVOp>,
     ) -> std::io::Result<AppendEntriesReply> {
-        self.append_entries(Context::current(), args)
+        self.0
+            .as_ref()
+            .unwrap()
+            .append_entries(Context::current(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
@@ -76,15 +84,22 @@ impl RemoteRaft<UniqueKVOp> for RaftServiceClient {
         &self,
         args: InstallSnapshotArgs,
     ) -> std::io::Result<InstallSnapshotReply> {
-        self.install_snapshot(Context::current(), args)
+        self.0
+            .as_ref()
+            .unwrap()
+            .install_snapshot(Context::current(), args)
             .await
             .map_err(crate::utils::translate_rpc_error)
     }
 }
 
+pub(crate) fn no_raft_service() -> OptionalRaftServiceClient {
+    OptionalRaftServiceClient(None)
+}
+
 pub(crate) async fn connect_to_raft_service(
     addr: SocketAddr,
-) -> std::io::Result<impl RemoteRaft<UniqueKVOp>> {
+) -> std::io::Result<OptionalRaftServiceClient> {
     let conn = tarpc::serde_transport::tcp::connect(
         addr,
         tokio_serde::formats::Json::default,
@@ -92,7 +107,7 @@ pub(crate) async fn connect_to_raft_service(
     .await?;
     let client =
         RaftServiceClient::new(tarpc::client::Config::default(), conn).spawn();
-    Ok(client)
+    Ok(OptionalRaftServiceClient(Some(client)))
 }
 
 pub(crate) fn start_raft_service_server(

+ 9 - 3
durio/src/run.rs

@@ -5,7 +5,9 @@ use kvraft::KVServer;
 
 use crate::kv_service::start_kv_service_server;
 use crate::persister::Persister;
-use crate::raft_service::{connect_to_raft_service, start_raft_service_server};
+use crate::raft_service::{
+    connect_to_raft_service, no_raft_service, start_raft_service_server,
+};
 
 pub(crate) async fn run_kv_instance(
     addr: SocketAddr,
@@ -13,8 +15,12 @@ pub(crate) async fn run_kv_instance(
     me: usize,
 ) -> std::io::Result<()> {
     let mut remote_rafts = vec![];
-    for raft_peer in &raft_peers {
-        let remote_raft = connect_to_raft_service(*raft_peer).await?;
+    for (index, raft_peer) in raft_peers.iter().enumerate() {
+        let remote_raft = if index == me {
+            no_raft_service()
+        } else {
+            connect_to_raft_service(*raft_peer).await?
+        };
         remote_rafts.push(remote_raft);
     }