Forráskód Böngészése

Implement snapshot taking for kv server.

A basic test in ruaft is added as well.
Jing Yang 4 éve
szülő
commit
36506a9be5

+ 1 - 0
kvraft/src/lib.rs

@@ -12,6 +12,7 @@ mod client;
 mod common;
 mod server;
 
+mod snapshot_holder;
 pub mod testing_utils;
 
 pub use client::Clerk;

+ 14 - 6
kvraft/src/server.rs

@@ -13,6 +13,7 @@ use crate::common::{
     ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueId,
 };
+use crate::snapshot_holder::SnapshotHolder;
 
 pub struct KVServer {
     me: AtomicUsize,
@@ -30,11 +31,12 @@ pub struct UniqueKVOp {
     unique_id: UniqueId,
 }
 
-#[derive(Default)]
+#[derive(Default, Serialize, Deserialize)]
 struct KVServerState {
     kv: HashMap<String, String>,
     debug_kv: HashMap<String, String>,
     applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
+    #[serde(skip)]
     queries: HashMap<UniqueId, Arc<ResultHolder>>,
 }
 
@@ -58,7 +60,7 @@ struct ResultHolder {
     condvar: Condvar,
 }
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
 enum CommitResult {
     Get(Option<String>),
     Put,
@@ -93,12 +95,15 @@ impl KVServer {
         servers: Vec<RpcClient>,
         me: usize,
         persister: Arc<dyn Persister>,
+        max_state_size_bytes: Option<usize>,
     ) -> Arc<Self> {
         let (tx, rx) = channel();
         let apply_command = move |index, command| {
             tx.send((index, command))
                 .expect("The receiving end of apply command channel should have not been dropped");
         };
+        let snapshot_holder = Arc::new(SnapshotHolder::default());
+        let snapshot_holder_clone = snapshot_holder.clone();
         let ret = Arc::new(Self {
             me: AtomicUsize::new(me),
             state: Default::default(),
@@ -107,11 +112,11 @@ impl KVServer {
                 me,
                 persister,
                 apply_command,
-                None,
-                Raft::<UniqueKVOp>::NO_SNAPSHOT,
+                max_state_size_bytes,
+                move |index| snapshot_holder_clone.request_snapshot(index),
             )),
         });
-        ret.process_command(rx);
+        ret.process_command(snapshot_holder, rx);
         ret
     }
 
@@ -179,13 +184,16 @@ impl KVServer {
 
     fn process_command(
         self: &Arc<Self>,
+        snapshot_holder: Arc<SnapshotHolder<KVServerState>>,
         command_channel: Receiver<IndexedCommand>,
     ) {
         let this = Arc::downgrade(self);
         std::thread::spawn(move || {
-            while let Ok((_, command)) = command_channel.recv() {
+            while let Ok((index, command)) = command_channel.recv() {
                 if let Some(this) = this.upgrade() {
                     this.apply_op(command.unique_id, command.me, command.op);
+                    snapshot_holder.take_snapshot(&this.state.lock(), index);
+                    snapshot_holder.unblock_response();
                 } else {
                     break;
                 }

+ 85 - 0
kvraft/src/snapshot_holder.rs

@@ -0,0 +1,85 @@
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+use parking_lot::{Condvar, Mutex};
+use serde::Serialize;
+
+use ruaft::Snapshot;
+
+#[derive(Default)]
+pub(crate) struct SnapshotHolder<T> {
+    snapshot_requests: Mutex<Vec<(usize, Arc<Condvar>)>>,
+    current_snapshot: Mutex<Snapshot>,
+    phantom: PhantomData<T>,
+}
+
+impl<T> SnapshotHolder<T> {
+    pub fn request_snapshot(&self, min_index: usize) -> Snapshot {
+        let condvar = {
+            let mut requests = self.snapshot_requests.lock();
+            let pos =
+                requests.binary_search_by_key(&min_index, |&(index, _)| index);
+            match pos {
+                Ok(pos) => requests[pos].1.clone(),
+                Err(pos) => {
+                    assert!(pos == 0 || requests[pos - 1].0 < min_index);
+                    assert!(
+                        pos == requests.len() - 1
+                            || requests[pos + 1].0 > min_index
+                    );
+                    let condvar = Arc::new(Condvar::new());
+                    requests.insert(pos, (min_index, condvar.clone()));
+                    condvar
+                }
+            }
+        };
+
+        // Now wait for the snapshot
+        let mut current_snapshot = self.current_snapshot.lock();
+        while current_snapshot.last_included_index < min_index {
+            condvar.wait(&mut current_snapshot);
+        }
+
+        current_snapshot.clone()
+    }
+}
+
+impl<T: Serialize> SnapshotHolder<T> {
+    const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
+    pub fn take_snapshot(&self, state: &T, curr: usize) {
+        let expired = self.current_snapshot.lock().last_included_index
+            + Self::MIN_SNAPSHOT_INDEX_INTERVAL
+            <= curr;
+        let requested = self
+            .snapshot_requests
+            .lock()
+            .first()
+            .map_or(false, |&(min_index, _)| min_index <= curr);
+
+        if expired || requested {
+            let data = bincode::serialize(state)
+                .expect("Serialization should never fail.");
+            let mut current_snapshot = self.current_snapshot.lock();
+            assert!(current_snapshot.last_included_index < curr);
+            *current_snapshot = Snapshot {
+                last_included_index: curr,
+                data,
+            }
+        }
+    }
+
+    pub fn unblock_response(&self) {
+        let curr = self.current_snapshot.lock().last_included_index;
+        let mut requests = self.snapshot_requests.lock();
+        let mut processed = 0;
+        for (index, condvar) in requests.iter() {
+            if *index <= curr {
+                processed += 1;
+                condvar.notify_all();
+            } else {
+                break;
+            }
+        }
+        requests.drain(0..processed);
+    }
+}

+ 2 - 1
kvraft/src/testing_utils/config.rs

@@ -58,7 +58,8 @@ impl Config {
 
         let persister = self.storage.lock().at(index);
 
-        let kv = KVServer::new(clients, index, persister);
+        let kv =
+            KVServer::new(clients, index, persister, Some(self.maxraftstate));
         self.state.lock().kv_servers[index].replace(kv.clone());
 
         let raft = std::rc::Rc::new(kv.raft());

+ 1 - 1
tests/snapshot_tests.rs

@@ -10,7 +10,7 @@ fn install_snapshot_rpc() {
     const SERVERS: usize = 3;
     const MAX_RAFT_STATE: usize = 1000;
     const KEY: &str = "a";
-    let cfg = Arc::new(make_config(SERVERS, true, 0));
+    let cfg = Arc::new(make_config(SERVERS, true, MAX_RAFT_STATE));
     defer!(cfg.clean_up());
 
     let mut clerk = cfg.make_clerk();