Преглед изворни кода

Move rpcs.rs to a separate sub-crate.

The goal is to eventually move ruaft main crate off labrpc. A couple of
test-only utility classes had to be created to allow existing tests to
be moved out of the main crate.
Jing Yang пре 4 година
родитељ
комит
4a0c62887c

+ 3 - 1
Cargo.toml

@@ -19,7 +19,6 @@ bytes = "1.0"
 crossbeam-utils = "0.8"
 futures-channel = "0.3.15"
 futures-util = "0.3.15"
-labrpc = "0.1.12"
 log = "0.4"
 parking_lot = "0.11.1"
 rand = "0.8"
@@ -35,9 +34,11 @@ integration-test = ["test_utils"]
 [dev-dependencies]
 anyhow = "1.0"
 futures = { version = "0.3.15", features = ["thread-pool"] }
+labrpc = "0.1.12"
 ruaft = { path = ".", features = ["integration-test"] }
 scopeguard = "1.1.0"
 stdext = "0.3"
+test_configs = { path = "test_configs" }
 test_utils = { path = "test_utils" }
 kvraft = { path = "kvraft" }
 
@@ -45,5 +46,6 @@ kvraft = { path = "kvraft" }
 members = [
     "kvraft",
     "linearizability",
+    "test_configs",
     "test_utils",
 ]

+ 1 - 0
kvraft/Cargo.toml

@@ -13,6 +13,7 @@ ruaft = { path = "..", features = ["integration-test"] }
 linearizability = { path = "../linearizability" }
 serde = "1.0.116"
 serde_derive = "1.0.116"
+test_configs = { path = "../test_configs" }
 test_utils = { path = "../test_utils" }
 tokio = { version = "1.7", features = ["time", "parking_lot"] }
 log = "0.4.14"

+ 2 - 2
kvraft/src/testing_utils/config.rs

@@ -5,8 +5,8 @@ use parking_lot::Mutex;
 use rand::seq::SliceRandom;
 use rand::thread_rng;
 
-use ruaft::rpcs::register_server;
 use ruaft::Persister;
+use test_configs::register_server;
 
 use crate::client::Clerk;
 use crate::server::KVServer;
@@ -48,7 +48,7 @@ impl Config {
         {
             let mut network = self.network.lock();
             for j in 0..self.server_count {
-                clients.push(ruaft::rpcs::RpcClient::new(network.make_client(
+                clients.push(test_configs::RpcClient::new(network.make_client(
                     Self::client_name(index, j),
                     Self::server_name(j),
                 )))

+ 1 - 1
kvraft/src/testing_utils/rpcs.rs

@@ -1,7 +1,7 @@
 use labrpc::{Network, Server};
 use parking_lot::Mutex;
 
-use ruaft::rpcs::make_rpc_handler;
+use test_configs::make_rpc_handler;
 
 use crate::common::{GET, PUT_APPEND};
 use crate::server::KVServer;

+ 0 - 1
src/lib.rs

@@ -32,7 +32,6 @@ mod process_install_snapshot;
 mod process_request_vote;
 mod raft_state;
 mod remote_raft;
-pub mod rpcs;
 mod snapshot;
 mod sync_log_entries;
 mod term_marker;

+ 1 - 1
src/process_append_entries.rs

@@ -12,7 +12,7 @@ impl<Command> Raft<Command>
 where
     Command: Clone + serde::Serialize + Default,
 {
-    pub(crate) fn process_append_entries(
+    pub fn process_append_entries(
         &self,
         args: AppendEntriesArgs<Command>,
     ) -> AppendEntriesReply {

+ 1 - 1
src/process_install_snapshot.rs

@@ -3,7 +3,7 @@ use crate::daemon_env::ErrorKind;
 use crate::{InstallSnapshotArgs, InstallSnapshotReply, Raft, State};
 
 impl<C: Clone + Default + serde::Serialize> Raft<C> {
-    pub(crate) fn process_install_snapshot(
+    pub fn process_install_snapshot(
         &self,
         args: InstallSnapshotArgs,
     ) -> InstallSnapshotReply {

+ 1 - 1
src/process_request_vote.rs

@@ -8,7 +8,7 @@ impl<Command> Raft<Command>
 where
     Command: Clone + serde::Serialize + Default,
 {
-    pub(crate) fn process_request_vote(
+    pub fn process_request_vote(
         &self,
         args: RequestVoteArgs,
     ) -> RequestVoteReply {

+ 49 - 0
src/utils.rs

@@ -30,3 +30,52 @@ where
 }
 
 pub const RPC_DEADLINE: Duration = Duration::from_secs(2);
+
+#[cfg(feature = "integration-test")]
+pub mod integration_test {
+    use crate::{
+        AppendEntriesArgs, AppendEntriesReply, Peer, RequestVoteArgs,
+        RequestVoteReply, Term,
+    };
+
+    pub fn make_request_vote_args(
+        term: Term,
+        peer_id: usize,
+        last_log_index: usize,
+        last_log_term: Term,
+    ) -> RequestVoteArgs {
+        RequestVoteArgs {
+            term,
+            candidate_id: Peer(peer_id),
+            last_log_index,
+            last_log_term,
+        }
+    }
+
+    pub fn make_append_entries_args<Command>(
+        term: Term,
+        leader_id: usize,
+        prev_log_index: usize,
+        prev_log_term: Term,
+        leader_commit: usize,
+    ) -> AppendEntriesArgs<Command> {
+        AppendEntriesArgs {
+            term,
+            leader_id: Peer(leader_id),
+            prev_log_index,
+            prev_log_term,
+            entries: vec![],
+            leader_commit,
+        }
+    }
+
+    pub fn unpack_request_vote_reply(reply: RequestVoteReply) -> (Term, bool) {
+        (reply.term, reply.vote_granted)
+    }
+
+    pub fn unpack_append_entries_reply(
+        reply: AppendEntriesReply,
+    ) -> (Term, bool) {
+        (reply.term, reply.success)
+    }
+}

+ 18 - 0
test_configs/Cargo.toml

@@ -0,0 +1,18 @@
+[package]
+name = "test_configs"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+async-trait = "0.1"
+bincode = "1.3.3"
+labrpc = "0.1.12"
+parking_lot = "0.11.1"
+ruaft = { path = "..", features = ["integration-test"] }
+serde = "1.0.116"
+
+[dev-dependencies]
+bytes = "1.0"
+futures = { version = "0.3.15", features = ["thread-pool"] }
+test_utils = { path = "../test_utils" }
+stdext = "0.3"

+ 3 - 0
test_configs/src/lib.rs

@@ -0,0 +1,3 @@
+mod rpcs;
+
+pub use rpcs::{make_rpc_handler, register_server, RpcClient};

+ 24 - 28
src/rpcs.rs → test_configs/src/rpcs.rs

@@ -1,13 +1,13 @@
 use async_trait::async_trait;
 use labrpc::{Client, Network, ReplyMessage, RequestMessage, Server};
 use parking_lot::Mutex;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
 
-use crate::{
+use ruaft::{
     AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
     InstallSnapshotReply, Raft, RequestVoteArgs, RequestVoteReply,
 };
-use serde::de::DeserializeOwned;
-use serde::Serialize;
 
 const REQUEST_VOTE_RPC: &str = "Raft.RequestVote";
 const APPEND_ENTRIES_RPC: &str = "Raft.AppendEntries";
@@ -43,8 +43,8 @@ impl RpcClient {
 }
 
 #[async_trait]
-impl<Command: 'static + Send + Serialize>
-    crate::remote_raft::RemoteRaft<Command> for RpcClient
+impl<Command: 'static + Send + Serialize> ruaft::RemoteRaft<Command>
+    for RpcClient
 {
     async fn request_vote(
         &self,
@@ -136,12 +136,16 @@ mod tests {
 
     use bytes::Bytes;
 
-    use crate::{ApplyCommandMessage, Peer, RemoteRaft, Term};
+    use ruaft::{ApplyCommandMessage, RemoteRaft, Term};
 
     use super::*;
+    use ruaft::utils::integration_test::{
+        make_append_entries_args, make_request_vote_args,
+        unpack_append_entries_reply, unpack_request_vote_reply,
+    };
 
-    type DoNothingPersister = ();
-    impl crate::Persister for DoNothingPersister {
+    struct DoNothingPersister;
+    impl ruaft::Persister for DoNothingPersister {
         fn read_state(&self) -> Bytes {
             Bytes::new()
         }
@@ -157,6 +161,8 @@ mod tests {
 
     #[test]
     fn test_basic_message() -> std::io::Result<()> {
+        test_utils::init_test_log!();
+
         let client = {
             let network = Network::run_daemon();
             let name = "test-basic-message";
@@ -168,7 +174,7 @@ mod tests {
             let raft = Arc::new(Raft::new(
                 vec![RpcClient(client)],
                 0,
-                Arc::new(()),
+                Arc::new(DoNothingPersister),
                 |_: ApplyCommandMessage<i32>| {},
                 None,
                 Raft::<i32>::NO_SNAPSHOT,
@@ -182,30 +188,20 @@ mod tests {
         };
 
         let rpc_client = RpcClient(client);
-        let request = RequestVoteArgs {
-            term: Term(2021),
-
-            candidate_id: Peer(0),
-            last_log_index: 0,
-            last_log_term: Term(0),
-        };
+        let request = make_request_vote_args(Term(2021), 0, 0, Term(0));
         let response = futures::executor::block_on(
             (&rpc_client as &dyn RemoteRaft<i32>).request_vote(request),
         )?;
-        assert!(response.vote_granted);
-
-        let request = AppendEntriesArgs::<i32> {
-            term: Term(2021),
-            leader_id: Peer(0),
-            prev_log_index: 0,
-            prev_log_term: Term(0),
-            entries: vec![],
-            leader_commit: 0,
-        };
+        let (_, vote_granted) = unpack_request_vote_reply(response);
+        assert!(vote_granted);
+
+        let request =
+            make_append_entries_args::<i32>(Term(2021), 0, 0, Term(0), 0);
         let response =
             futures::executor::block_on(rpc_client.append_entries(request))?;
-        assert_eq!(2021, response.term.0);
-        assert!(response.success);
+        let (Term(term), success) = unpack_append_entries_reply(response);
+        assert_eq!(2021, term);
+        assert!(success);
 
         Ok(())
     }

+ 2 - 2
tests/config/mod.rs

@@ -10,8 +10,8 @@ use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 use tokio::time::Duration;
 
-use ruaft::rpcs::register_server;
 use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
+use test_configs::register_server;
 
 pub mod persister;
 
@@ -304,7 +304,7 @@ impl Config {
         {
             let mut network = self.network.lock();
             for j in 0..self.server_count {
-                clients.push(ruaft::rpcs::RpcClient::new(network.make_client(
+                clients.push(test_configs::RpcClient::new(network.make_client(
                     Self::client_name(index, j),
                     Self::server_name(j),
                 )))