Przeglądaj źródła

Merge branch 'durio'

Adds a KV store that runs as a web service, and scripts to deploy it
onto Raspberry Pis.
Jing Yang 4 lat temu
rodzic
commit
67a6c6cc5a

+ 10 - 1
durio/Cargo.toml

@@ -13,13 +13,22 @@ keywords = ["raft"]
 categories = ["raft"]
 
 [dependencies]
+async-trait = "0.1"
+bytes = "1.0"
+crossbeam-channel = "0.5.1"
+futures-util = "0.3.15"
 kvraft = { path = "../kvraft" }
+lazy_static = "1.4.0"
+log = "0.4"
+parking_lot = "0.11.1"
 ruaft = { path = "..", features = ["integration-test"] }
 serde = "1.0"
 serde_derive = "1.0"
 serde_json = "1.0"
-tarpc = "0.27"
+tarpc = { version = "0.27", features = ["serde-transport", "tcp"] }
+test_utils = { path = "../test_utils", default-features = false }
 tokio = { version = "1.7", features = ["macros", "rt-multi-thread", "time", "parking_lot"] }
+tokio-serde = { version = "0.8", features = ["json"] }
 warp = "0.3"
 
 [dev-dependencies]

+ 30 - 0
durio/build.sh

@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+# To setup cross build on a linux machine, do the following
+# 1. Install package arm-linux-gnu-gcc (yum) or gcc-arm-linux-gnueabihf (apt).
+# 2. Run `rustup target add armv7-unknown-linux-musleabihf`
+# 1. Add the following to ~/.cargo/config
+# [target.armv7-unknown-linux-musleabihf]
+# linker = "arm-linux-gnu-gcc"
+
+set -ex
+rsync -av /Users/ditsing/Code/ruaft ec2:~/compile/ --exclude 'ruaft/target' --exclude 'ruaft/.git' --exclude '.idea'
+ssh ec2 'cd ~/compile/ruaft/durio && cargo build --target=armv7-unknown-linux-musleabihf --release'
+mkdir -p /tmp/ruaft
+rsync -av 'ec2:~/compile/ruaft/target/armv7-unknown-linux-musleabihf/release/durio' '/tmp/ruaft/durio'
+
+ssh alice 'pkill -9 durio || echo nothing'
+rsync -av '/tmp/ruaft/durio' alice:/tmp/durio
+ssh alice '
+  RUST_LOG=warp,tarpc=error,ruaft=debug,kvraft=debug,durio nohup /tmp/durio 1 1>>/tmp/durio.out 2>>/tmp/durio.err &
+'
+
+ssh bob 'pkill -9 durio || echo nothing'
+rsync -av '/tmp/ruaft/durio' bob:/tmp/durio
+ssh bob '
+  RUST_LOG=warp,tarpc=error,ruaft=debug,kvraft=debug,durio nohup /tmp/durio 2 1>>/tmp/durio.out 2>>/tmp/durio.err &
+'
+
+RUST_LOG=warp,tarpc=error,ruaft=debug,kvraft=debug,durio cargo run 0 || echo "Done"
+
+ssh alice 'pkill -9 durio || echo nothing'
+ssh bob 'pkill -9 durio || echo nothing'

+ 6 - 0
durio/curl.sh

@@ -0,0 +1,6 @@
+curl -i -X POST -H 'Content-Type: application/json' -d '{"key": "hi", "value": "Hello "}' http://10.1.1.51:9007/kvstore/put
+curl -i -X GET http://10.1.1.56:9008/kvstore/get/hi
+curl -i -X POST -H 'Content-Type: application/json' -d '{"key": "hi", "value": "World!"}' http://10.1.1.198:9006/kvstore/append
+curl -i -X GET http://10.1.1.51:9007/kvstore/get/hi
+curl -i -X POST -H 'Content-Type: application/json' -d '{"key": "hi", "value": "      "}' http://10.1.1.56:9008/kvstore/put
+curl -i -X GET http://10.1.1.198:9006/kvstore/get/hi

+ 14 - 0
durio/pi-setup-notes.md

@@ -0,0 +1,14 @@
+1. Flush SD card with the newest Lite OS.
+2. Touch /boot/ssh on the pi.
+3. Connect pi to the Internet router via a network cable.
+4. Connect dev machine to same router.
+5. ssh pi@`the ip address` with password 'raspberry'. 
+   1. Change the password. 
+   2. Change hostname to pi-`nextname`.
+   3. Connect the pi to the Raft router via wireless.
+   4. Run `sudo apt-get update`.
+   5. Reboot.
+6. Run `ssh-keygen -t ed25519 -f .ssh/pi-nextname`
+7. Copy SSH key pub file to pi.
+   1. `mkdir -p .ssh`
+   2. `mv pi.pub .ssh/authorized_keys`

+ 74 - 0
durio/src/kv_service.rs

@@ -0,0 +1,74 @@
+use std::future::Future;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tarpc::context::Context;
+
+use kvraft::{
+    GetArgs, GetReply, KVServer, PutAppendArgs, PutAppendReply, RemoteKvraft,
+};
+
+#[tarpc::service]
+pub(crate) trait KVService {
+    async fn get(args: GetArgs) -> GetReply;
+    async fn put_append(args: PutAppendArgs) -> PutAppendReply;
+}
+
+#[derive(Clone)]
+struct KVRpcServer(Arc<KVServer>);
+
+#[tarpc::server]
+impl KVService for KVRpcServer {
+    async fn get(self, _context: Context, args: GetArgs) -> GetReply {
+        self.0.get(args).await
+    }
+
+    async fn put_append(
+        self,
+        _context: Context,
+        args: PutAppendArgs,
+    ) -> PutAppendReply {
+        self.0.put_append(args).await
+    }
+}
+
+#[async_trait]
+impl RemoteKvraft for KVServiceClient {
+    async fn get(&self, args: GetArgs) -> std::io::Result<GetReply> {
+        self.get(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
+
+    async fn put_append(
+        &self,
+        args: PutAppendArgs,
+    ) -> std::io::Result<PutAppendReply> {
+        self.put_append(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
+}
+
+#[allow(dead_code)]
+pub(crate) async fn connect_to_kv_service(
+    addr: SocketAddr,
+) -> std::io::Result<KVServiceClient> {
+    let conn = tarpc::serde_transport::tcp::connect(
+        addr,
+        tokio_serde::formats::Json::default,
+    )
+    .await?;
+    let client =
+        KVServiceClient::new(tarpc::client::Config::default(), conn).spawn();
+    Ok(client)
+}
+
+pub(crate) fn start_kv_service_server(
+    addr: SocketAddr,
+    kv_server: Arc<KVServer>,
+) -> impl Future<Output = std::io::Result<()>> {
+    let server = KVRpcServer(kv_server);
+    crate::utils::start_tarpc_server(addr, server.serve())
+}

+ 126 - 14
durio/src/main.rs

@@ -1,33 +1,145 @@
+use std::net::SocketAddr;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
+use kvraft::KVServer;
+use lazy_static::lazy_static;
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
 
+use crate::one_clerk::create_clerk;
+use crate::run::run_kv_instance;
+
+mod kv_service;
+mod one_clerk;
+mod persister;
+mod raft_service;
+mod run;
+mod utils;
+
 #[derive(Deserialize, Serialize)]
 struct PutAppendBody {
+    key: String,
     value: String,
 }
 
-#[tokio::main]
-async fn main() {
+const IP_ONE: [u8; 4] = [10, 1, 1, 198];
+const IP_TWO: [u8; 4] = [10, 1, 1, 51];
+const IP_THREE: [u8; 4] = [10, 1, 1, 56];
+
+lazy_static! {
+    static ref KV_ADDRS: Vec<SocketAddr> = vec![
+        (IP_ONE, 9986).into(),
+        (IP_TWO, 9987).into(),
+        (IP_THREE, 9988).into(),
+    ];
+    static ref RAFT_ADDRS: Vec<SocketAddr> = vec![
+        (IP_ONE, 10006).into(),
+        (IP_TWO, 10007).into(),
+        (IP_THREE, 10008).into(),
+    ];
+    static ref WEB_ADDRS: Vec<SocketAddr> = vec![
+        ([0, 0, 0, 0], 9006).into(),
+        ([0, 0, 0, 0], 9007).into(),
+        ([0, 0, 0, 0], 9008).into(),
+    ];
+}
+
+const NOT_READY: &str = "Clerk is not ready";
+
+async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
+    let is_leader = warp::get()
+        .and(warp::path!("kvstore" / "is_leader"))
+        .map(move || format!("{:?}", kv_server.raft().get_state()));
+
     let counter = Arc::new(AtomicUsize::new(0));
-    let counter_clone = counter.clone();
-    let get = warp::path!("kvstore" / "get" / String).map(move |key| {
-        key + "!" + counter.fetch_add(1, Ordering::SeqCst).to_string().as_str()
-    });
+    let counter_2 = counter.clone();
+    let counter_3 = counter.clone();
+    let clerk = create_clerk(KV_ADDRS.clone());
+
+    let get_clerk = clerk.clone();
+    let get = warp::get()
+        .and(warp::path!("kvstore" / "get" / String))
+        .map(move |key: String| {
+            let counter = counter.fetch_add(1, Ordering::SeqCst).to_string();
+            match get_clerk.get(key.clone()) {
+                Some(value) => {
+                    key + "!" + counter.as_str() + "!" + value.as_str()
+                }
+                None => NOT_READY.to_string(),
+            }
+        });
+
+    let put_clerk = clerk.clone();
     let put = warp::post()
-        .and(warp::path!("kvstore" / "put" / String))
+        .and(warp::path!("kvstore" / "put"))
         .and(warp::body::json())
-        .map(move |key, _body: PutAppendBody| {
-            counter_clone.fetch_add(1, Ordering::SeqCst);
-            warp::reply::json(&key)
+        .map(move |body: PutAppendBody| {
+            counter_2.fetch_add(1, Ordering::SeqCst);
+            let code = match put_clerk.put(body.key, body.value) {
+                None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
+                Some(_) => warp::http::StatusCode::OK,
+            };
+            warp::reply::with_status(warp::reply(), code)
         });
+    let append_clerk = clerk.clone();
     let append = warp::post()
-        .and(warp::path!("kvstore" / "append" / String))
+        .and(warp::path!("kvstore" / "append"))
         .and(warp::body::json())
-        .map(|key, _body: PutAppendBody| warp::reply::json(&key));
+        .map(move |body: PutAppendBody| {
+            counter_3.fetch_add(1, Ordering::SeqCst);
+            let code = match append_clerk.append(body.key, body.value) {
+                None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
+                Some(_) => warp::http::StatusCode::OK,
+            };
+            warp::reply::with_status(warp::reply(), code)
+        });
+
+    let routes = is_leader.or(get).or(put).or(append);
+    warp::serve(routes).run(socket_addr).await;
+}
+
+fn main() {
+    let me: usize = std::env::args()
+        .nth(1)
+        .unwrap_or_default()
+        .parse()
+        .expect("An index of the current instance must be passed in");
+    test_utils::init_log(format!("durio-instance-{}", me).as_str())
+        .expect("Initiating log should not fail");
+
+    // Run RPC servers in a thread pool. This pool
+    // 1. Accepts incoming RPC connections for KV and Raft servers.
+    // 2. Sends out RPCs to other Raft instances.
+    // Timers are used by RPC handling code in the KV server.
+    let local_logger = test_utils::thread_local_logger::get();
+    let rpc_server_thread_pool = tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .thread_name("durio-rpc")
+        .on_thread_start(move || {
+            test_utils::thread_local_logger::set(local_logger.clone())
+        })
+        .build()
+        .expect("Creating thread pool should not fail");
+
+    let kv_server = rpc_server_thread_pool.block_on(async {
+        run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
+            .await
+            .expect("Running kv instance should not fail")
+    });
+
+    // Run web servers in a thread pool. This pool
+    // 1. Accepts incoming HTTP connections.
+    // 2. Sends out RPCs to KV instances, both local and remote.
+    let local_logger = test_utils::thread_local_logger::get();
+    let thread_pool = tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .thread_name("durio-web")
+        .on_thread_start(move || {
+            test_utils::thread_local_logger::set(local_logger.clone())
+        })
+        .build()
+        .expect("Creating thread pool should not fail");
 
-    let routes = warp::get().and(get.or(put).or(append));
-    warp::serve(routes).run(([0, 0, 0, 0], 9090)).await;
+    thread_pool.block_on(run_web_server(WEB_ADDRS[me], kv_server));
 }

+ 125 - 0
durio/src/one_clerk.rs

@@ -0,0 +1,125 @@
+use std::net::SocketAddr;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc::Sender;
+use std::sync::Arc;
+
+use crate::kv_service::connect_to_kv_service;
+use kvraft::Clerk;
+
+pub(crate) fn create_clerk(socket_addrs: Vec<SocketAddr>) -> OneClerk {
+    OneClerk::create(socket_addrs)
+}
+
+#[derive(Clone)]
+pub(crate) struct OneClerk {
+    ready: Arc<AtomicBool>,
+    requests: crossbeam_channel::Sender<(ClerkRequest, Sender<String>)>,
+}
+
+enum ClerkRequest {
+    Get(String),
+    Put(String, String),
+    Append(String, String),
+}
+
+impl OneClerk {
+    pub(crate) fn create(socket_addrs: Vec<SocketAddr>) -> Self {
+        let ready = Arc::new(AtomicBool::new(false));
+        // Create a thread that blocks on all requests to the clerk.
+        let requests = Self::run_clerk_thread(socket_addrs, ready.clone());
+
+        OneClerk { ready, requests }
+    }
+
+    async fn initialize_clerk(socket_addrs: Vec<SocketAddr>) -> Clerk {
+        log::info!("Starting clerk creation ...");
+        let mut clients = vec![None; socket_addrs.len()];
+        while clients.iter().filter(|e| e.is_none()).count() != 0 {
+            for (index, socket_addr) in socket_addrs.iter().enumerate() {
+                let result = connect_to_kv_service(*socket_addr).await;
+                match result {
+                    Ok(client) => clients[index] = Some(client),
+                    Err(e) => log::error!(
+                        "Error connecting to {:?}: {}",
+                        socket_addr,
+                        e
+                    ),
+                }
+            }
+            log::info!("Clerk clients are {:?}", clients);
+        }
+
+        log::info!("Done clerk creation ...");
+        let clients = clients.into_iter().map(|e| e.unwrap()).collect();
+        Clerk::new(clients)
+    }
+
+    /// A thread must be created for get requests. We cannot run the blocking
+    /// Clerk functions on tokio thread pool threads.
+    fn run_clerk_thread(
+        socket_addrs: Vec<SocketAddr>,
+        ready: Arc<AtomicBool>,
+    ) -> crossbeam_channel::Sender<(ClerkRequest, Sender<String>)> {
+        let local_logger =
+            test_utils::thread_local_logger::LocalLogger::inherit();
+
+        // Steal a tokio runtime to run the initializer.
+        let tokio_handle = tokio::runtime::Handle::current();
+        let (tx, rx) =
+            crossbeam_channel::unbounded::<(ClerkRequest, Sender<String>)>();
+        std::thread::spawn(move || {
+            local_logger.attach();
+
+            let mut clerk =
+                tokio_handle.block_on(Self::initialize_clerk(socket_addrs));
+            clerk.init_once();
+
+            ready.store(true, Ordering::Release);
+
+            while let Ok((request, result)) = rx.recv() {
+                let value = match request {
+                    ClerkRequest::Get(key) => {
+                        clerk.get(key).unwrap_or_default()
+                    }
+                    ClerkRequest::Put(key, value) => {
+                        clerk.put(key, value);
+                        String::default()
+                    }
+                    ClerkRequest::Append(key, value) => {
+                        clerk.append(key, value);
+                        String::default()
+                    }
+                };
+                let _ = result.send(value);
+            }
+        });
+        tx
+    }
+
+    fn request(&self, request: ClerkRequest) -> Option<String> {
+        if !self.ready.load(Ordering::Acquire) {
+            return None;
+        }
+
+        let (result_tx, result_rx) = std::sync::mpsc::channel();
+        self.requests
+            .send((request, result_tx))
+            .expect("Send get request should not fail");
+        let value = result_rx
+            .recv()
+            .expect("Receiving get response should not fail");
+        Some(value)
+    }
+
+    pub(crate) fn get(&self, key: String) -> Option<String> {
+        self.request(ClerkRequest::Get(key))
+    }
+
+    pub(crate) fn put(&self, key: String, value: String) -> Option<String> {
+        self.request(ClerkRequest::Put(key, value))
+    }
+
+    pub(crate) fn append(&self, key: String, value: String) -> Option<String> {
+        self.request(ClerkRequest::Append(key, value))
+    }
+}

+ 48 - 0
durio/src/persister.rs

@@ -0,0 +1,48 @@
+use parking_lot::Mutex;
+
+#[derive(Clone)]
+pub struct State {
+    pub bytes: bytes::Bytes,
+    pub snapshot: Vec<u8>,
+}
+
+pub struct Persister {
+    state: Mutex<State>,
+}
+
+impl Persister {
+    pub fn new() -> Self {
+        Self {
+            state: Mutex::new(State {
+                bytes: bytes::Bytes::new(),
+                snapshot: vec![],
+            }),
+        }
+    }
+}
+
+impl Default for Persister {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ruaft::Persister for Persister {
+    fn read_state(&self) -> bytes::Bytes {
+        self.state.lock().bytes.clone()
+    }
+
+    fn save_state(&self, data: bytes::Bytes) {
+        self.state.lock().bytes = data;
+    }
+
+    fn state_size(&self) -> usize {
+        self.state.lock().bytes.len()
+    }
+
+    fn save_snapshot_and_state(&self, state: bytes::Bytes, snapshot: &[u8]) {
+        let mut this = self.state.lock();
+        this.bytes = state;
+        this.snapshot = snapshot.to_vec();
+    }
+}

+ 132 - 0
durio/src/raft_service.rs

@@ -0,0 +1,132 @@
+use std::future::Future;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tarpc::context::Context;
+
+use kvraft::UniqueKVOp;
+use ruaft::{
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, Raft, RemoteRaft, RequestVoteArgs, RequestVoteReply,
+};
+
+#[tarpc::service]
+pub(crate) trait RaftService {
+    async fn append_entries(
+        args: AppendEntriesArgs<UniqueKVOp>,
+    ) -> AppendEntriesReply;
+    async fn install_snapshot(
+        args: InstallSnapshotArgs,
+    ) -> InstallSnapshotReply;
+    async fn request_vote(args: RequestVoteArgs) -> RequestVoteReply;
+}
+
+#[derive(Clone)]
+struct RaftRpcServer(Arc<Raft<UniqueKVOp>>);
+
+#[tarpc::server]
+impl RaftService for RaftRpcServer {
+    async fn append_entries(
+        self,
+        _context: Context,
+        args: AppendEntriesArgs<UniqueKVOp>,
+    ) -> AppendEntriesReply {
+        self.0.process_append_entries(args)
+    }
+
+    async fn install_snapshot(
+        self,
+        _context: Context,
+        args: InstallSnapshotArgs,
+    ) -> InstallSnapshotReply {
+        self.0.process_install_snapshot(args)
+    }
+
+    async fn request_vote(
+        self,
+        _context: Context,
+        args: RequestVoteArgs,
+    ) -> RequestVoteReply {
+        self.0.process_request_vote(args)
+    }
+}
+
+pub(crate) struct LazyRaftServiceClient {
+    socket_addr: SocketAddr,
+    once_cell: tokio::sync::OnceCell<RaftServiceClient>,
+}
+
+impl LazyRaftServiceClient {
+    pub(crate) fn new(socket_addr: SocketAddr) -> Self {
+        Self {
+            socket_addr,
+            once_cell: tokio::sync::OnceCell::new(),
+        }
+    }
+
+    pub(crate) async fn get_or_try_init(
+        &self,
+    ) -> std::io::Result<&RaftServiceClient> {
+        self.once_cell
+            .get_or_try_init(|| connect_to_raft_service(self.socket_addr))
+            .await
+    }
+}
+
+#[async_trait]
+impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
+    async fn request_vote(
+        &self,
+        args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        self.get_or_try_init()
+            .await?
+            .request_vote(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
+
+    async fn append_entries(
+        &self,
+        args: AppendEntriesArgs<UniqueKVOp>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        self.get_or_try_init()
+            .await?
+            .append_entries(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
+
+    async fn install_snapshot(
+        &self,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        self.get_or_try_init()
+            .await?
+            .install_snapshot(Context::current(), args)
+            .await
+            .map_err(crate::utils::translate_rpc_error)
+    }
+}
+
+pub(crate) async fn connect_to_raft_service(
+    addr: SocketAddr,
+) -> std::io::Result<RaftServiceClient> {
+    let conn = tarpc::serde_transport::tcp::connect(
+        addr,
+        tokio_serde::formats::Json::default,
+    )
+    .await?;
+    let client =
+        RaftServiceClient::new(tarpc::client::Config::default(), conn).spawn();
+    Ok(client)
+}
+
+pub(crate) fn start_raft_service_server(
+    addr: SocketAddr,
+    raft: Arc<Raft<UniqueKVOp>>,
+) -> impl Future<Output = std::io::Result<()>> {
+    let server = RaftRpcServer(raft);
+    crate::utils::start_tarpc_server(addr, server.serve())
+}

+ 31 - 0
durio/src/run.rs

@@ -0,0 +1,31 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use kvraft::KVServer;
+
+use crate::kv_service::start_kv_service_server;
+use crate::persister::Persister;
+use crate::raft_service::{start_raft_service_server, LazyRaftServiceClient};
+
+pub(crate) async fn run_kv_instance(
+    addr: SocketAddr,
+    raft_peers: Vec<SocketAddr>,
+    me: usize,
+) -> std::io::Result<Arc<KVServer>> {
+    let local_raft_peer = raft_peers[me];
+
+    let mut remote_rafts = vec![];
+    for raft_peer in raft_peers {
+        remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
+    }
+
+    let persister = Arc::new(Persister::new());
+
+    let kv_server = KVServer::new(remote_rafts, me, persister, None);
+    let raft = Arc::new(kv_server.raft().clone());
+
+    start_raft_service_server(local_raft_peer, raft).await?;
+    start_kv_service_server(addr, kv_server.clone()).await?;
+
+    Ok(kv_server)
+}

+ 47 - 0
durio/src/utils.rs

@@ -0,0 +1,47 @@
+use std::io::ErrorKind;
+use std::net::SocketAddr;
+
+use futures_util::StreamExt;
+use tarpc::client::RpcError;
+use tarpc::server::{Channel, Serve};
+
+pub(crate) fn translate_rpc_error(e: RpcError) -> std::io::Error {
+    match e {
+        RpcError::Disconnected => std::io::Error::new(ErrorKind::BrokenPipe, e),
+        RpcError::DeadlineExceeded => {
+            std::io::Error::new(ErrorKind::TimedOut, e)
+        }
+        RpcError::Server(server_error) => {
+            std::io::Error::new(ErrorKind::Other, server_error)
+        }
+    }
+}
+
+pub(crate) async fn start_tarpc_server<Request, Reply, ServeFn>(
+    addr: SocketAddr,
+    serve: ServeFn,
+) -> std::io::Result<()>
+where
+    Request: Send + 'static + serde::de::DeserializeOwned,
+    Reply: Send + 'static + serde::ser::Serialize,
+    ServeFn:
+        tarpc::server::Serve<Request, Resp = Reply> + Send + 'static + Clone,
+    <ServeFn as Serve<Request>>::Fut: Send,
+{
+    let mut listener = tarpc::serde_transport::tcp::listen(
+        addr,
+        tokio_serde::formats::Json::default,
+    )
+    .await?;
+
+    tokio::spawn(async move {
+        while let Some(conn) = listener.next().await {
+            if let Ok(conn) = conn {
+                let channel = tarpc::server::BaseChannel::with_defaults(conn)
+                    .max_concurrent_requests(1);
+                tokio::spawn(channel.execute(serve.clone()));
+            }
+        }
+    });
+    Ok(())
+}

+ 1 - 0
kvraft/src/lib.rs

@@ -2,6 +2,7 @@ pub use client::Clerk;
 pub use common::{GetArgs, GetReply, PutAppendArgs, PutAppendReply};
 pub use remote_kvraft::RemoteKvraft;
 pub use server::KVServer;
+pub use server::UniqueKVOp;
 
 mod client;
 mod common;

+ 2 - 2
kvraft/src/server.rs

@@ -27,7 +27,7 @@ pub struct KVServer {
     logger: LocalLogger,
 }
 
-#[derive(Clone, Default, Serialize, Deserialize)]
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
 pub struct UniqueKVOp {
     op: KVOp,
     me: usize,
@@ -51,7 +51,7 @@ struct KVServerState {
     >,
 }
 
-#[derive(Clone, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 enum KVOp {
     NoOp,
     Get(String),

+ 1 - 0
src/lib.rs

@@ -184,6 +184,7 @@ where
         let thread_env = daemon_env.for_thread();
         let thread_pool = tokio::runtime::Builder::new_multi_thread()
             .enable_time()
+            .enable_io()
             .thread_name(format!("raft-instance-{}", me))
             .worker_threads(peer_size)
             .on_thread_start(move || thread_env.clone().attach())

+ 1 - 1
src/persister.rs

@@ -2,7 +2,7 @@ use std::convert::TryFrom;
 
 use bytes::Bytes;
 use serde::de::DeserializeOwned;
-use serde::Serialize;
+use serde::ser::Serialize;
 use serde_derive::{Deserialize, Serialize};
 
 use crate::log_array::LogArray;