Parcourir la source

Decouple kvraft from labrpc.

Jing Yang il y a 4 ans
Parent
commit
974f3abb0b

+ 0 - 1
kvraft/Cargo.toml

@@ -6,7 +6,6 @@ edition = "2018"
 [dependencies]
 async-trait = "0.1"
 bincode = "1.3.3"
-labrpc = "0.1.12"
 parking_lot = "0.11.1"
 rand = "0.8"
 ruaft = { path = "..", features = ["integration-test"] }

+ 10 - 8
kvraft/src/client.rs

@@ -2,7 +2,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Once;
 use std::time::Duration;
 
-use labrpc::{Client, RequestMessage};
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 
@@ -11,6 +10,7 @@ use crate::common::{
     PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
 };
 use crate::common::{KVError, ValidReply};
+use crate::RemoteKvraft;
 
 pub struct Clerk {
     init: Once,
@@ -18,7 +18,7 @@ pub struct Clerk {
 }
 
 impl Clerk {
-    pub fn new(servers: Vec<Client>) -> Self {
+    pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
         Self {
             init: Once::new(),
             inner: ClerkInner::new(servers),
@@ -64,7 +64,7 @@ impl Clerk {
 }
 
 pub struct ClerkInner {
-    servers: Vec<Client>,
+    servers: Vec<Box<dyn RemoteKvraft>>,
 
     last_server_index: AtomicUsize,
     unique_id: UniqueIdSequence,
@@ -73,7 +73,11 @@ pub struct ClerkInner {
 }
 
 impl ClerkInner {
-    pub fn new(servers: Vec<Client>) -> Self {
+    pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
+        let servers = servers
+            .into_iter()
+            .map(|s| Box::new(s) as Box<dyn RemoteKvraft>)
+            .collect();
         Self {
             servers,
 
@@ -128,10 +132,8 @@ impl ClerkInner {
         R: DeserializeOwned + ValidReply,
     {
         let method = method.as_ref().to_owned();
-        let data = RequestMessage::from(
-            bincode::serialize(&args)
-                .expect("Serialization of requests should not fail"),
-        );
+        let data = bincode::serialize(&args)
+            .expect("Serialization of requests should not fail");
 
         let max_retry =
             std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());

+ 2 - 0
kvraft/src/lib.rs

@@ -1,9 +1,11 @@
 pub use client::Clerk;
 pub use common::{GET, PUT_APPEND};
+pub use remote_kvraft::RemoteKvraft;
 pub use server::KVServer;
 
 mod client;
 mod common;
 mod server;
 
+mod remote_kvraft;
 mod snapshot_holder;

+ 10 - 0
kvraft/src/remote_kvraft.rs

@@ -0,0 +1,10 @@
+use async_trait::async_trait;
+
+#[async_trait]
+pub trait RemoteKvraft: Send + Sync + 'static {
+    async fn call_rpc(
+        &self,
+        method: String,
+        request: Vec<u8>,
+    ) -> std::io::Result<Vec<u8>>;
+}

+ 3 - 3
test_configs/src/kvraft/config.rs

@@ -5,7 +5,7 @@ use parking_lot::Mutex;
 use rand::seq::SliceRandom;
 use rand::thread_rng;
 
-use crate::{register_kv_server, register_server, Persister};
+use crate::{register_kv_server, register_server, Persister, RpcClient};
 
 use kvraft::Clerk;
 use kvraft::KVServer;
@@ -196,10 +196,10 @@ impl Config {
         {
             let mut network = self.network.lock();
             for j in 0..self.server_count {
-                clients.push(network.make_client(
+                clients.push(RpcClient::new(network.make_client(
                     Self::kv_clerk_name(clerk_index, j),
                     Self::kv_server_name(j),
-                ));
+                )));
             }
             // Disable clerk connection to all kv servers.
             Self::set_clerk_connect(

+ 15 - 1
test_configs/src/rpcs.rs

@@ -4,7 +4,7 @@ use parking_lot::Mutex;
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 
-use kvraft::{KVServer, GET, PUT_APPEND};
+use kvraft::{KVServer, RemoteKvraft, GET, PUT_APPEND};
 use ruaft::{
     AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
     InstallSnapshotReply, Raft, RequestVoteArgs, RequestVoteReply,
@@ -69,6 +69,20 @@ impl<Command: 'static + Send + Serialize> ruaft::RemoteRaft<Command>
     }
 }
 
+#[async_trait]
+impl RemoteKvraft for RpcClient {
+    async fn call_rpc(
+        &self,
+        method: String,
+        request: Vec<u8>,
+    ) -> std::io::Result<Vec<u8>> {
+        self.0
+            .call_rpc(method, RequestMessage::from(request))
+            .await
+            .map(|data| data.to_vec())
+    }
+}
+
 pub fn make_rpc_handler<Request, Reply, F>(
     func: F,
 ) -> Box<dyn Fn(RequestMessage) -> ReplyMessage>