Jing Yang 5 rokov pred
rodič
commit
a8a54701cb
2 zmenil súbory, kde vykonal 62 pridanie a 1 odobranie
  1. 9 1
      tests/kvraft/common.rs
  2. 53 0
      tests/kvraft/server.rs

+ 9 - 1
tests/kvraft/common.rs

@@ -2,7 +2,15 @@ use rand::{thread_rng, RngCore};
 use std::sync::atomic::{AtomicU64, Ordering};
 
 #[derive(
-    Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize,
+    Clone,
+    Debug,
+    Default,
+    Ord,
+    PartialOrd,
+    Eq,
+    PartialEq,
+    Serialize,
+    Deserialize,
 )]
 pub struct UniqueId {
     clerk_id: u64,

+ 53 - 0
tests/kvraft/server.rs

@@ -1 +1,54 @@
+use super::common::UniqueId;
+use parking_lot::Mutex;
+use ruaft::{Persister, Raft, RpcClient};
+use std::collections::HashMap;
+use std::sync::atomic::AtomicBool;
+use std::sync::mpsc::{channel, Receiver};
+use std::sync::Arc;
 
+struct KVServer {
+    state: Mutex<KVServerState>,
+    rf: Raft<KVOp>,
+    command_channel: Receiver<(usize, KVOp)>,
+    shutdown: AtomicBool,
+    // snapshot
+}
+
+#[derive(Clone, Default, Serialize, Deserialize)]
+struct KVOp {
+    unique_id: UniqueId,
+}
+
+#[derive(Default)]
+struct KVServerState {
+    kv: HashMap<String, String>,
+    debug_kv: HashMap<String, String>,
+    applied_op: HashMap<UniqueId, KVOp>,
+}
+
+impl KVServer {
+    pub fn new(
+        servers: Vec<RpcClient>,
+        me: usize,
+        persister: Arc<dyn Persister>,
+    ) -> Self {
+        let (tx, rx) = channel();
+        let apply_command = move |index, command| {
+            tx.send((index, command))
+                .expect("The receiving end of apply command channel should have not been dropped");
+        };
+        Self {
+            state: Default::default(),
+            rf: Raft::new(
+                servers,
+                me,
+                persister,
+                apply_command,
+                None,
+                Raft::<KVOp>::NO_SNAPSHOT,
+            ),
+            command_channel: rx,
+            shutdown: AtomicBool::new(false),
+        }
+    }
+}