浏览代码

Add code to register KV server with the network.

Jing Yang 5 年之前
父节点
当前提交
262235cc56
共有 3 个文件被更改,包括 68 次插入0 次删除
  1. 12 0
      kvraft/src/testing_utils/config.rs
  2. 1 0
      kvraft/src/testing_utils/mod.rs
  3. 55 0
      kvraft/src/testing_utils/rpcs.rs

+ 12 - 0
kvraft/src/testing_utils/config.rs

@@ -6,6 +6,7 @@ use ruaft::RpcClient;
 use server::KVServer;
 use std::sync::Arc;
 use testing_utils::memory_persister::MemoryStorage;
+use testing_utils::rpcs::register_kv_server;
 
 struct ConfigState {
     kv_servers: Vec<Option<Arc<KVServer>>>,
@@ -21,6 +22,10 @@ pub struct Config {
 }
 
 impl Config {
+    fn kv_server_name(i: usize) -> String {
+        format!("kv-server-{}", i)
+    }
+
     fn server_name(i: usize) -> String {
         format!("kvraft-server-{}", i)
     }
@@ -47,7 +52,14 @@ impl Config {
         self.state.lock().kv_servers[index].replace(kv.clone());
 
         let raft = std::rc::Rc::new(kv.raft());
+
         register_server(raft, Self::server_name(index), self.network.as_ref())?;
+
+        register_kv_server(
+            kv,
+            Self::kv_server_name(index),
+            self.network.as_ref(),
+        )?;
         Ok(())
     }
 }

+ 1 - 0
kvraft/src/testing_utils/mod.rs

@@ -1,2 +1,3 @@
 mod config;
 mod memory_persister;
+mod rpcs;

+ 55 - 0
kvraft/src/testing_utils/rpcs.rs

@@ -0,0 +1,55 @@
+use common::{GET, PUT_APPEND};
+use labrpc::{Network, ReplyMessage, RequestMessage, Server};
+use parking_lot::Mutex;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use server::KVServer;
+
+fn make_rpc_handler<Request, Reply, F>(
+    func: F,
+) -> Box<dyn Fn(RequestMessage) -> ReplyMessage>
+where
+    Request: DeserializeOwned,
+    Reply: Serialize,
+    F: 'static + Fn(Request) -> Reply,
+{
+    Box::new(move |request| {
+        let reply = func(
+            bincode::deserialize(&request)
+                .expect("Deserialization should not fail"),
+        );
+
+        ReplyMessage::from(
+            bincode::serialize(&reply).expect("Serialization should not fail"),
+        )
+    })
+}
+
+pub fn register_kv_server<
+    KV: 'static + AsRef<KVServer> + Clone,
+    S: AsRef<str>,
+>(
+    kv: KV,
+    name: S,
+    network: &Mutex<Network>,
+) -> std::io::Result<()> {
+    let mut network = network.lock();
+    let server_name = name.as_ref();
+    let mut server = Server::make_server(server_name);
+
+    let kv_clone = kv.clone();
+    server.register_rpc_handler(
+        GET.to_owned(),
+        make_rpc_handler(move |args| kv_clone.as_ref().get(args)),
+    )?;
+
+    let kv_clone = kv.clone();
+    server.register_rpc_handler(
+        PUT_APPEND.to_owned(),
+        make_rpc_handler(move |args| kv_clone.as_ref().put_append(args)),
+    )?;
+
+    network.add_server(server_name, server);
+
+    Ok(())
+}