Quellcode durchsuchen

Create an abstract layer on top of RPC interfaces for remote peers.

A new trait RemoteRuaft is added to represent a Raft peer. The RPC layer
should provide an implementation that sends messages to the peer and
conveys the response back.

With RemoteRuaft, Ruaft can be run on different RPC layers easily.

An implementation of RemoteRuaft must be static because it will be sent
between threads.
Jing Yang vor 4 Jahren
Ursprung
Commit
ba5494ad08
10 geänderte Dateien mit 115 neuen und 31 gelöschten Zeilen
  1. 1 0
      Cargo.toml
  2. 2 2
      kvraft/src/server.rs
  3. 2 2
      kvraft/src/testing_utils/config.rs
  4. 4 4
      src/election.rs
  5. 4 5
      src/heartbeats.rs
  6. 15 10
      src/lib.rs
  7. 52 0
      src/remote_raft.rs
  8. 27 0
      src/rpcs.rs
  9. 6 6
      src/sync_log_entries.rs
  10. 2 2
      tests/config/mod.rs

+ 1 - 0
Cargo.toml

@@ -13,6 +13,7 @@ homepage = "https://github.com/ditsing/ruaft"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 
 [dependencies]
 [dependencies]
+async-trait = "0.1"
 bincode = "1.3.3"
 bincode = "1.3.3"
 bytes = "1.0"
 bytes = "1.0"
 crossbeam-utils = "0.8"
 crossbeam-utils = "0.8"

+ 2 - 2
kvraft/src/server.rs

@@ -8,7 +8,7 @@ use std::time::Duration;
 use parking_lot::{Condvar, Mutex};
 use parking_lot::{Condvar, Mutex};
 use serde_derive::{Deserialize, Serialize};
 use serde_derive::{Deserialize, Serialize};
 
 
-use ruaft::{ApplyCommandMessage, Persister, Raft, RpcClient, Term};
+use ruaft::{ApplyCommandMessage, Persister, Raft, RemoteRaft, Term};
 use test_utils::thread_local_logger::LocalLogger;
 use test_utils::thread_local_logger::LocalLogger;
 
 
 use crate::common::{
 use crate::common::{
@@ -94,7 +94,7 @@ impl From<CommitError> for KVError {
 
 
 impl KVServer {
 impl KVServer {
     pub fn new(
     pub fn new(
-        servers: Vec<RpcClient>,
+        servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
         me: usize,
         persister: Arc<dyn Persister>,
         persister: Arc<dyn Persister>,
         max_state_size_bytes: Option<usize>,
         max_state_size_bytes: Option<usize>,

+ 2 - 2
kvraft/src/testing_utils/config.rs

@@ -6,7 +6,7 @@ use rand::seq::SliceRandom;
 use rand::thread_rng;
 use rand::thread_rng;
 
 
 use ruaft::rpcs::register_server;
 use ruaft::rpcs::register_server;
-use ruaft::{Persister, RpcClient};
+use ruaft::Persister;
 
 
 use crate::client::Clerk;
 use crate::client::Clerk;
 use crate::server::KVServer;
 use crate::server::KVServer;
@@ -48,7 +48,7 @@ impl Config {
         {
         {
             let mut network = self.network.lock();
             let mut network = self.network.lock();
             for j in 0..self.server_count {
             for j in 0..self.server_count {
-                clients.push(RpcClient::new(network.make_client(
+                clients.push(ruaft::rpcs::RpcClient::new(network.make_client(
                     Self::client_name(index, j),
                     Self::client_name(index, j),
                     Self::server_name(j),
                     Self::server_name(j),
                 )))
                 )))

+ 4 - 4
src/election.rs

@@ -8,7 +8,7 @@ use rand::{thread_rng, Rng};
 use crate::daemon_env::Daemon;
 use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::utils::{retry_rpc, RPC_DEADLINE};
-use crate::{Peer, Raft, RaftState, RequestVoteArgs, RpcClient, State, Term};
+use crate::{Peer, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term};
 
 
 #[derive(Default)]
 #[derive(Default)]
 pub(crate) struct ElectionState {
 pub(crate) struct ElectionState {
@@ -281,16 +281,16 @@ where
 
 
     const REQUEST_VOTE_RETRY: usize = 1;
     const REQUEST_VOTE_RETRY: usize = 1;
     async fn request_vote(
     async fn request_vote(
-        rpc_client: Arc<RpcClient>,
+        rpc_client: impl RemoteRaft<Command>,
         args: RequestVoteArgs,
         args: RequestVoteArgs,
         term_marker: TermMarker<Command>,
         term_marker: TermMarker<Command>,
     ) -> Option<bool> {
     ) -> Option<bool> {
         let term = args.term;
         let term = args.term;
         // See the comment in send_heartbeat() for this override.
         // See the comment in send_heartbeat() for this override.
-        let rpc_client = rpc_client.as_ref();
+        let rpc_client = &rpc_client;
         let reply =
         let reply =
             retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
             retry_rpc(Self::REQUEST_VOTE_RETRY, RPC_DEADLINE, move |_round| {
-                rpc_client.call_request_vote(args.clone())
+                rpc_client.request_vote(args.clone())
             })
             })
             .await;
             .await;
         if let Ok(reply) = reply {
         if let Ok(reply) = reply {

+ 4 - 5
src/heartbeats.rs

@@ -1,12 +1,11 @@
 use std::sync::atomic::Ordering;
 use std::sync::atomic::Ordering;
-use std::sync::Arc;
 use std::time::Duration;
 use std::time::Duration;
 
 
 use parking_lot::Mutex;
 use parking_lot::Mutex;
 
 
 use crate::term_marker::TermMarker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::utils::{retry_rpc, RPC_DEADLINE};
-use crate::{AppendEntriesArgs, Raft, RaftState, RpcClient};
+use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
 
 // Command must be
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
@@ -79,7 +78,7 @@ where
 
 
     const HEARTBEAT_RETRY: usize = 1;
     const HEARTBEAT_RETRY: usize = 1;
     async fn send_heartbeat(
     async fn send_heartbeat(
-        rpc_client: Arc<RpcClient>,
+        rpc_client: impl RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
         args: AppendEntriesArgs<Command>,
         term_watermark: TermMarker<Command>,
         term_watermark: TermMarker<Command>,
     ) -> std::io::Result<()> {
     ) -> std::io::Result<()> {
@@ -97,10 +96,10 @@ where
         // Another option is to use non-move closures, in which case rpc_client
         // Another option is to use non-move closures, in which case rpc_client
         // of type Arc can be passed-in directly. However that requires args to
         // of type Arc can be passed-in directly. However that requires args to
         // be sync because they can be shared by more than one futures.
         // be sync because they can be shared by more than one futures.
-        let rpc_client = rpc_client.as_ref();
+        let rpc_client = &rpc_client;
         let response =
         let response =
             retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
             retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
-                rpc_client.call_append_entries(args.clone())
+                rpc_client.append_entries(args.clone())
             })
             })
             .await?;
             .await?;
         term_watermark.mark(response.term);
         term_watermark.mark(response.term);

+ 15 - 10
src/lib.rs

@@ -16,7 +16,7 @@ use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
 pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
 pub(crate) use crate::raft_state::RaftState;
 pub(crate) use crate::raft_state::State;
 pub(crate) use crate::raft_state::State;
-pub use crate::rpcs::RpcClient;
+pub use crate::remote_raft::RemoteRaft;
 pub use crate::snapshot::Snapshot;
 pub use crate::snapshot::Snapshot;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 
 
@@ -31,6 +31,7 @@ mod process_append_entries;
 mod process_install_snapshot;
 mod process_install_snapshot;
 mod process_request_vote;
 mod process_request_vote;
 mod raft_state;
 mod raft_state;
+mod remote_raft;
 pub mod rpcs;
 pub mod rpcs;
 mod snapshot;
 mod snapshot;
 mod sync_log_entries;
 mod sync_log_entries;
@@ -56,7 +57,7 @@ struct LogEntry<Command> {
 #[derive(Clone)]
 #[derive(Clone)]
 pub struct Raft<Command> {
 pub struct Raft<Command> {
     inner_state: Arc<Mutex<RaftState<Command>>>,
     inner_state: Arc<Mutex<RaftState<Command>>>,
-    peers: Vec<Arc<RpcClient>>,
+    peers: Vec<Arc<dyn RemoteRaft<Command>>>,
 
 
     me: Peer,
     me: Peer,
 
 
@@ -75,7 +76,7 @@ pub struct Raft<Command> {
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct RequestVoteArgs {
+pub struct RequestVoteArgs {
     term: Term,
     term: Term,
     candidate_id: Peer,
     candidate_id: Peer,
     last_log_index: Index,
     last_log_index: Index,
@@ -83,13 +84,13 @@ struct RequestVoteArgs {
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct RequestVoteReply {
+pub struct RequestVoteReply {
     term: Term,
     term: Term,
     vote_granted: bool,
     vote_granted: bool,
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct AppendEntriesArgs<Command> {
+pub struct AppendEntriesArgs<Command> {
     term: Term,
     term: Term,
     leader_id: Peer,
     leader_id: Peer,
     prev_log_index: Index,
     prev_log_index: Index,
@@ -99,14 +100,14 @@ struct AppendEntriesArgs<Command> {
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct AppendEntriesReply {
+pub struct AppendEntriesReply {
     term: Term,
     term: Term,
     success: bool,
     success: bool,
     committed: Option<IndexTerm>,
     committed: Option<IndexTerm>,
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct InstallSnapshotArgs {
+pub struct InstallSnapshotArgs {
     pub(crate) term: Term,
     pub(crate) term: Term,
     leader_id: Peer,
     leader_id: Peer,
     pub(crate) last_included_index: Index,
     pub(crate) last_included_index: Index,
@@ -118,7 +119,7 @@ struct InstallSnapshotArgs {
 }
 }
 
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct InstallSnapshotReply {
+pub struct InstallSnapshotReply {
     term: Term,
     term: Term,
     committed: Option<IndexTerm>,
     committed: Option<IndexTerm>,
 }
 }
@@ -144,7 +145,7 @@ where
     /// Each instance will create at least 4 + (number of peers) threads. The
     /// Each instance will create at least 4 + (number of peers) threads. The
     /// extensive usage of threads is to minimize latency.
     /// extensive usage of threads is to minimize latency.
     pub fn new(
     pub fn new(
-        peers: Vec<RpcClient>,
+        peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
         me: usize,
         persister: Arc<dyn Persister>,
         persister: Arc<dyn Persister>,
         apply_command: impl ApplyCommandFnMut<Command>,
         apply_command: impl ApplyCommandFnMut<Command>,
@@ -190,7 +191,11 @@ where
             .on_thread_stop(ThreadEnv::detach)
             .on_thread_stop(ThreadEnv::detach)
             .build()
             .build()
             .expect("Creating thread pool should not fail");
             .expect("Creating thread pool should not fail");
-        let peers = peers.into_iter().map(Arc::new).collect();
+        let peers = peers
+            .into_iter()
+            .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
+            .collect();
+
         let mut this = Raft {
         let mut this = Raft {
             inner_state: Arc::new(Mutex::new(state)),
             inner_state: Arc::new(Mutex::new(state)),
             peers,
             peers,

+ 52 - 0
src/remote_raft.rs

@@ -0,0 +1,52 @@
+use async_trait::async_trait;
+
+use crate::{
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, RequestVoteArgs, RequestVoteReply,
+};
+
+#[async_trait]
+pub trait RemoteRaft<Command>: Send + Sync {
+    async fn request_vote(
+        &self,
+        args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply>;
+
+    async fn append_entries(
+        &self,
+        args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<AppendEntriesReply>;
+
+    async fn install_snapshot(
+        &self,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply>;
+}
+
+#[async_trait]
+impl<Command, R> RemoteRaft<Command> for R
+where
+    Command: Send + 'static,
+    R: AsRef<dyn RemoteRaft<Command>> + Send + Sync,
+{
+    async fn request_vote(
+        &self,
+        args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        self.as_ref().request_vote(args).await
+    }
+
+    async fn append_entries(
+        &self,
+        args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        self.as_ref().append_entries(args).await
+    }
+
+    async fn install_snapshot(
+        &self,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        self.as_ref().install_snapshot(args).await
+    }
+}

+ 27 - 0
src/rpcs.rs

@@ -1,3 +1,4 @@
+use async_trait::async_trait;
 use labrpc::{Client, Network, ReplyMessage, RequestMessage, Server};
 use labrpc::{Client, Network, ReplyMessage, RequestMessage, Server};
 use parking_lot::Mutex;
 use parking_lot::Mutex;
 
 
@@ -62,6 +63,32 @@ impl RpcClient {
     }
     }
 }
 }
 
 
+#[async_trait]
+impl<Command: 'static + Send + Serialize>
+    crate::remote_raft::RemoteRaft<Command> for RpcClient
+{
+    async fn request_vote(
+        &self,
+        args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        self.call_request_vote(args).await
+    }
+
+    async fn append_entries(
+        &self,
+        args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        self.call_append_entries(args).await
+    }
+
+    async fn install_snapshot(
+        &self,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        self.call_install_snapshot(args).await
+    }
+}
+
 pub fn make_rpc_handler<Request, Reply, F>(
 pub fn make_rpc_handler<Request, Reply, F>(
     func: F,
     func: F,
 ) -> Box<dyn Fn(RequestMessage) -> ReplyMessage>
 ) -> Box<dyn Fn(RequestMessage) -> ReplyMessage>

+ 6 - 6
src/sync_log_entries.rs

@@ -10,7 +10,7 @@ use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::{
 use crate::{
-    AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RpcClient,
+    AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
     Term, HEARTBEAT_INTERVAL_MILLIS,
     Term, HEARTBEAT_INTERVAL_MILLIS,
 };
 };
 
 
@@ -154,7 +154,7 @@ where
     /// and a committed log entry can never diverge.
     /// and a committed log entry can never diverge.
     async fn sync_log_entries(
     async fn sync_log_entries(
         rf: Arc<Mutex<RaftState<Command>>>,
         rf: Arc<Mutex<RaftState<Command>>>,
-        rpc_client: Arc<RpcClient>,
+        rpc_client: impl RemoteRaft<Command>,
         peer_index: usize,
         peer_index: usize,
         rerun: std::sync::mpsc::Sender<Option<Peer>>,
         rerun: std::sync::mpsc::Sender<Option<Peer>>,
         opening: Arc<AtomicUsize>,
         opening: Arc<AtomicUsize>,
@@ -378,14 +378,14 @@ where
 
 
     const APPEND_ENTRIES_RETRY: usize = 1;
     const APPEND_ENTRIES_RETRY: usize = 1;
     async fn append_entries(
     async fn append_entries(
-        rpc_client: &RpcClient,
+        rpc_client: &dyn RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
         args: AppendEntriesArgs<Command>,
     ) -> std::io::Result<SyncLogEntriesResult> {
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
         let term = args.term;
         let reply = retry_rpc(
         let reply = retry_rpc(
             Self::APPEND_ENTRIES_RETRY,
             Self::APPEND_ENTRIES_RETRY,
             RPC_DEADLINE,
             RPC_DEADLINE,
-            move |_round| rpc_client.call_append_entries(args.clone()),
+            move |_round| rpc_client.append_entries(args.clone()),
         )
         )
         .await?;
         .await?;
         Ok(if reply.term == term {
         Ok(if reply.term == term {
@@ -418,14 +418,14 @@ where
 
 
     const INSTALL_SNAPSHOT_RETRY: usize = 1;
     const INSTALL_SNAPSHOT_RETRY: usize = 1;
     async fn install_snapshot(
     async fn install_snapshot(
-        rpc_client: &RpcClient,
+        rpc_client: &dyn RemoteRaft<Command>,
         args: InstallSnapshotArgs,
         args: InstallSnapshotArgs,
     ) -> std::io::Result<SyncLogEntriesResult> {
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
         let term = args.term;
         let reply = retry_rpc(
         let reply = retry_rpc(
             Self::INSTALL_SNAPSHOT_RETRY,
             Self::INSTALL_SNAPSHOT_RETRY,
             RPC_DEADLINE,
             RPC_DEADLINE,
-            move |_round| rpc_client.call_install_snapshot(args.clone()),
+            move |_round| rpc_client.install_snapshot(args.clone()),
         )
         )
         .await?;
         .await?;
         Ok(if reply.term == term {
         Ok(if reply.term == term {

+ 2 - 2
tests/config/mod.rs

@@ -11,7 +11,7 @@ use rand::{thread_rng, Rng};
 use tokio::time::Duration;
 use tokio::time::Duration;
 
 
 use ruaft::rpcs::register_server;
 use ruaft::rpcs::register_server;
-use ruaft::{ApplyCommandMessage, Persister, Raft, RpcClient, Term};
+use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
 
 
 pub mod persister;
 pub mod persister;
 
 
@@ -304,7 +304,7 @@ impl Config {
         {
         {
             let mut network = self.network.lock();
             let mut network = self.network.lock();
             for j in 0..self.server_count {
             for j in 0..self.server_count {
-                clients.push(RpcClient::new(network.make_client(
+                clients.push(ruaft::rpcs::RpcClient::new(network.make_client(
                     Self::client_name(index, j),
                     Self::client_name(index, j),
                     Self::server_name(j),
                     Self::server_name(j),
                 )))
                 )))