Pārlūkot izejas kodu

Add test persister, test config and make_config() function.

Jing Yang 4 gadi atpakaļ
vecāks
revīzija
dc1844486e

+ 1 - 0
kvraft/Cargo.toml

@@ -3,6 +3,7 @@ name = "kvraft"
 version = "0.1.0"
 
 [dependencies]
+anyhow = "1.0"
 bincode = "1.3.1"
 bytes = "1.0"
 labrpc = { path = "../../labrpc" }

+ 3 - 0
kvraft/src/lib.rs

@@ -1,3 +1,4 @@
+extern crate anyhow;
 extern crate labrpc;
 extern crate parking_lot;
 extern crate rand;
@@ -9,3 +10,5 @@ extern crate serde_derive;
 mod client;
 mod common;
 mod server;
+
+mod testing_utils;

+ 6 - 2
kvraft/src/server.rs

@@ -9,7 +9,7 @@ use std::sync::mpsc::{channel, Receiver};
 use std::sync::Arc;
 use std::time::Duration;
 
-struct KVServer {
+pub struct KVServer {
     state: Mutex<KVServerState>,
     rf: Mutex<Raft<UniqueKVOp>>,
     // snapshot
@@ -18,7 +18,7 @@ struct KVServer {
 type IndexedCommand = (usize, UniqueKVOp);
 
 #[derive(Clone, Default, Serialize, Deserialize)]
-struct UniqueKVOp {
+pub struct UniqueKVOp {
     op: KVOp,
     unique_id: UniqueId,
 }
@@ -352,6 +352,10 @@ impl KVServer {
         PutAppendReply { result }
     }
 
+    pub fn raft(&self) -> Raft<UniqueKVOp> {
+        self.rf.lock().clone()
+    }
+
     pub fn kill(self) {
         self.rf.into_inner().kill()
         // The process_command thread will exit, after Raft drops the reference

+ 91 - 0
kvraft/src/testing_utils/config.rs

@@ -0,0 +1,91 @@
+pub use anyhow::Result;
+use client::Clerk;
+use parking_lot::Mutex;
+use ruaft::rpcs::register_server;
+use ruaft::RpcClient;
+use server::KVServer;
+use std::sync::Arc;
+use testing_utils::memory_persister::MemoryStorage;
+
+struct ConfigState {
+    kv_servers: Vec<Option<Arc<KVServer>>>,
+    clerks: Vec<Option<Clerk>>,
+}
+
+pub struct Config {
+    network: Arc<Mutex<labrpc::Network>>,
+    server_count: usize,
+    state: Mutex<ConfigState>,
+    storage: MemoryStorage,
+    maxraftstate: usize,
+}
+
+impl Config {
+    fn server_name(i: usize) -> String {
+        format!("kvraft-server-{}", i)
+    }
+
+    fn client_name(client: usize, server: usize) -> String {
+        format!("kvraft-client-{}-to-{}", client, server)
+    }
+
+    fn start_server(&self, index: usize) -> Result<()> {
+        let mut clients = vec![];
+        {
+            let mut network = self.network.lock();
+            for j in 0..self.server_count {
+                clients.push(RpcClient::new(network.make_client(
+                    Self::client_name(index, j),
+                    Self::server_name(j),
+                )))
+            }
+        }
+
+        let persister = self.storage.at(index);
+
+        let kv = KVServer::new(clients, index, persister);
+        self.state.lock().kv_servers[index].replace(kv.clone());
+
+        let raft = std::rc::Rc::new(kv.raft());
+        register_server(raft, Self::server_name(index), self.network.as_ref())?;
+        Ok(())
+    }
+}
+
+pub fn make_config(
+    server_count: usize,
+    unreliable: bool,
+    maxraftstate: usize,
+) -> Config {
+    let network = labrpc::Network::run_daemon();
+    {
+        let mut unlocked_network = network.lock();
+        unlocked_network.set_reliable(!unreliable);
+        unlocked_network.set_long_delays(true);
+    }
+
+    let state = Mutex::new(ConfigState {
+        kv_servers: vec![None; server_count],
+        clerks: vec![],
+    });
+
+    let mut storage = MemoryStorage::default();
+    for _ in 0..server_count {
+        storage.make();
+    }
+
+    let cfg = Config {
+        network,
+        server_count,
+        state,
+        storage,
+        maxraftstate,
+    };
+
+    for i in 0..server_count {
+        cfg.start_server(i)
+            .expect("Starting server should not fail");
+    }
+
+    cfg
+}

+ 59 - 0
kvraft/src/testing_utils/memory_persister.rs

@@ -0,0 +1,59 @@
+use parking_lot::Mutex;
+use std::sync::Arc;
+
+pub struct State {
+    bytes: bytes::Bytes,
+    snapshot: Vec<u8>,
+}
+
+pub struct MemoryPersister {
+    state: Mutex<State>,
+}
+
+impl MemoryPersister {
+    pub fn new() -> Self {
+        Self {
+            state: Mutex::new(State {
+                bytes: bytes::Bytes::new(),
+                snapshot: vec![],
+            }),
+        }
+    }
+}
+
+impl ruaft::Persister for MemoryPersister {
+    fn read_state(&self) -> bytes::Bytes {
+        self.state.lock().bytes.clone()
+    }
+
+    fn save_state(&self, data: bytes::Bytes) {
+        self.state.lock().bytes = data;
+    }
+
+    fn state_size(&self) -> usize {
+        self.state.lock().bytes.len()
+    }
+
+    fn save_snapshot_and_state(&self, state: bytes::Bytes, snapshot: &[u8]) {
+        let mut this = self.state.lock();
+        this.bytes = state;
+        this.snapshot = snapshot.to_vec();
+    }
+}
+
+#[derive(Default)]
+pub struct MemoryStorage {
+    state_vec: Vec<Arc<MemoryPersister>>,
+}
+
+impl MemoryStorage {
+    pub fn make(&mut self) -> Arc<MemoryPersister> {
+        let persister = Arc::new(MemoryPersister::new());
+        self.state_vec.push(persister.clone());
+        persister
+    }
+
+    pub fn at(&self, index: usize) -> Arc<MemoryPersister> {
+        self.state_vec[index].clone()
+    }
+}

+ 2 - 0
kvraft/src/testing_utils/mod.rs

@@ -0,0 +1,2 @@
+mod config;
+mod memory_persister;