Pārlūkot izejas kodu

Run the servers in main.

Jing Yang 4 gadi atpakaļ
vecāks
revīzija
90290beca6
4 mainītis faili ar 90 papildinājumiem un 4 dzēšanām
  1. 2 0
      durio/Cargo.toml
  2. 12 4
      durio/src/main.rs
  3. 48 0
      durio/src/persister.rs
  4. 28 0
      durio/src/run.rs

+ 2 - 0
durio/Cargo.toml

@@ -14,8 +14,10 @@ categories = ["raft"]
 
 [dependencies]
 async-trait = "0.1"
+bytes = "1.0"
 futures-util = "0.3.15"
 kvraft = { path = "../kvraft" }
+parking_lot = "0.11.1"
 ruaft = { path = "..", features = ["integration-test"] }
 serde = "1.0"
 serde_derive = "1.0"

+ 12 - 4
durio/src/main.rs

@@ -1,13 +1,17 @@
-mod kv_service;
-mod raft_service;
-mod utils;
-
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 
 use serde_derive::{Deserialize, Serialize};
 use warp::Filter;
 
+use crate::run::run_kv_instance;
+
+mod kv_service;
+mod persister;
+mod raft_service;
+mod run;
+mod utils;
+
 #[derive(Deserialize, Serialize)]
 struct PutAppendBody {
     value: String,
@@ -15,6 +19,10 @@ struct PutAppendBody {
 
 #[tokio::main]
 async fn main() {
+    run_kv_instance(([127, 0, 0, 1], 9988).into(), vec![], 0)
+        .await
+        .expect("Running kv instance should not fail");
+
     let counter = Arc::new(AtomicUsize::new(0));
     let counter_clone = counter.clone();
     let get = warp::path!("kvstore" / "get" / String).map(move |key| {

+ 48 - 0
durio/src/persister.rs

@@ -0,0 +1,48 @@
+use parking_lot::Mutex;
+
+#[derive(Clone)]
+pub struct State {
+    pub bytes: bytes::Bytes,
+    pub snapshot: Vec<u8>,
+}
+
+pub struct Persister {
+    state: Mutex<State>,
+}
+
+impl Persister {
+    pub fn new() -> Self {
+        Self {
+            state: Mutex::new(State {
+                bytes: bytes::Bytes::new(),
+                snapshot: vec![],
+            }),
+        }
+    }
+}
+
+impl Default for Persister {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ruaft::Persister for Persister {
+    fn read_state(&self) -> bytes::Bytes {
+        self.state.lock().bytes.clone()
+    }
+
+    fn save_state(&self, data: bytes::Bytes) {
+        self.state.lock().bytes = data;
+    }
+
+    fn state_size(&self) -> usize {
+        self.state.lock().bytes.len()
+    }
+
+    fn save_snapshot_and_state(&self, state: bytes::Bytes, snapshot: &[u8]) {
+        let mut this = self.state.lock();
+        this.bytes = state;
+        this.snapshot = snapshot.to_vec();
+    }
+}

+ 28 - 0
durio/src/run.rs

@@ -0,0 +1,28 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use kvraft::KVServer;
+
+use crate::kv_service::start_kv_service_server;
+use crate::persister::Persister;
+use crate::raft_service::{connect_to_raft_service, start_raft_service_server};
+
+pub(crate) async fn run_kv_instance(
+    addr: SocketAddr,
+    raft_peers: Vec<SocketAddr>,
+    me: usize,
+) -> std::io::Result<()> {
+    let mut remote_rafts = vec![];
+    for raft_peer in &raft_peers {
+        let remote_raft = connect_to_raft_service(*raft_peer).await?;
+        remote_rafts.push(remote_raft);
+    }
+
+    let persister = Arc::new(Persister::new());
+
+    let kv_server = KVServer::new(remote_rafts, me, persister, None);
+    let raft = Arc::new(kv_server.raft().clone());
+
+    start_raft_service_server(raft_peers[me], raft).await?;
+    start_kv_service_server(addr, kv_server).await
+}