Forráskód Böngészése

Implement a blocking mechanism for snapshots.

Jing Yang 5 éve
szülő
commit
32eeb627cf
1 módosított fájl, 40 hozzáadás és 19 törlés
  1. 40 19
      src/snapshot.rs

+ 40 - 19
src/snapshot.rs

@@ -1,26 +1,48 @@
 use crate::{Index, Raft};
-use crossbeam_utils::sync::{Parker, Unparker};
+use parking_lot::{Condvar, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
 
 pub struct Snapshot {
-    pub data: Vec<u8>,
     pub last_included_index: Index,
+    pub data: Vec<u8>,
 }
 
-#[derive(Clone, Debug, Default)]
+#[derive(Debug, Default)]
 pub(crate) struct SnapshotDaemon {
-    unparker: Option<Unparker>,
+    count: AtomicUsize,
+    mutex: Mutex<usize>,
+    cond: Condvar,
 }
 
 impl SnapshotDaemon {
-    pub(crate) fn trigger_snapshot(&self) {
-        match self.unparker {
-            Some(&unparker) => unparker.unpark(),
-            None => {}
+    pub(crate) fn trigger_snapshot_soft(&self) -> usize {
+        self.count.fetch_add(1, Ordering::SeqCst)
+    }
+
+    pub(crate) fn trigger_snapshot(&self) -> usize {
+        let prev = *self.mutex.lock();
+        let curr = self.trigger_snapshot_soft();
+        self.cond.notify_one();
+        assert!(curr >= prev);
+        assert_ne!(curr, usize::MAX, "The counter overflowed.");
+        curr
+    }
+
+    fn wait_for_triggering(&self) -> (usize, usize) {
+        let mut guard = self.mutex.lock();
+        loop {
+            let curr = self.count.load(Ordering::SeqCst);
+            if curr != *guard {
+                let prev = *guard;
+                *guard = curr;
+                return (prev, curr);
+            }
+            self.cond.wait(&mut guard);
         }
     }
 }
 
-impl<C: 'static + Default + Send> Raft<C> {
+impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
     pub(crate) fn run_snapshot_daemon<Func>(
         &self,
         max_state_size: Option<usize>,
@@ -33,16 +55,13 @@ impl<C: 'static + Default + Send> Raft<C> {
             None => return,
         };
 
-        let parker = Parker::new();
-        let unparker = parker.unparker();
-        self.snapshot_daemon.unparker.replace(unparker.clone());
-
         let rf = self.inner_state.clone();
+        let snapshot_daemon = self.snapshot_daemon.clone();
         let persister = self.persister.clone();
 
         std::thread::spawn(move || loop {
-            parker.park();
-            if current_state_size >= max_state_size {
+            snapshot_daemon.wait_for_triggering();
+            if persister.state_size() >= max_state_size {
                 let (term, log_start) = {
                     let rf = rf.lock();
                     (rf.current_term, rf.log.first_index_term())
@@ -54,20 +73,22 @@ impl<C: 'static + Default + Send> Raft<C> {
                     || rf.log.first_index_term() != log_start
                 {
                     // Term has changed, or another snapshot was installed.
-                    unparker.unpark();
+                    snapshot_daemon.trigger_snapshot_soft();
                     continue;
                 }
                 if snapshot.last_included_index <= rf.log.start()
                     || snapshot.last_included_index >= rf.log.end()
                 {
                     // TODO(ditsing): Something happened.
-                    unparker.unpark();
+                    snapshot_daemon.trigger_snapshot_soft();
                     continue;
                 }
 
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
-                // TOOD(ditsing): fix.
-                // this.persisted_state(rf.persisted_state());
+                persister.save_snapshot_and_state(
+                    rf.persisted_state().into(),
+                    rf.log.snapshot().1,
+                );
             }
         });
     }