Quellcode durchsuchen

A new design for snapshots.

Jing Yang vor 4 Jahren
Ursprung
Commit
2aeebd895b
2 geänderte Dateien mit 48 neuen und 10 gelöschten Zeilen
  1. 2 4
      src/lib.rs
  2. 46 6
      src/snapshot.rs

+ 2 - 4
src/lib.rs

@@ -330,6 +330,7 @@ where
             };
             self.apply_command_signal.notify_one();
         }
+        self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
 
         AppendEntriesReply {
             term: args.term,
@@ -987,8 +988,5 @@ impl ElectionState {
 }
 
 impl<C> Raft<C> {
-    pub const NO_SNAPSHOT: fn(Index) -> Snapshot = |index| Snapshot {
-        last_included_index: index,
-        data: vec![],
-    };
+    pub const NO_SNAPSHOT: fn(Index) = |_| {};
 }

+ 46 - 6
src/snapshot.rs

@@ -3,6 +3,8 @@ use std::sync::atomic::Ordering;
 use crossbeam_utils::sync::{Parker, Unparker};
 
 use crate::{Index, Raft};
+use parking_lot::{Condvar, Mutex};
+use std::sync::Arc;
 
 #[derive(Clone, Debug, Default)]
 pub struct Snapshot {
@@ -13,16 +15,22 @@ pub struct Snapshot {
 #[derive(Clone, Debug, Default)]
 pub(crate) struct SnapshotDaemon {
     unparker: Option<Unparker>,
+    current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
 }
 
-pub trait RequestSnapshotFnMut:
-    'static + Send + FnMut(Index) -> Snapshot
-{
-}
+pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
 
-impl<T: 'static + Send + FnMut(Index) -> Snapshot> RequestSnapshotFnMut for T {}
+impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
 
 impl SnapshotDaemon {
+    pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
+        let mut curr = self.current_snapshot.0.lock();
+        if curr.last_included_index < snapshot.last_included_index {
+            *curr = snapshot;
+        }
+        self.current_snapshot.1.notify_one();
+    }
+
     pub(crate) fn trigger(&self) {
         match &self.unparker {
             Some(unparker) => unparker.unpark(),
@@ -30,12 +38,32 @@ impl SnapshotDaemon {
         }
     }
 
+    const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
+
+    pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
+        let last_included_index =
+            self.current_snapshot.0.lock().last_included_index;
+        if last_included_index > first_index
+            || last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL
+        {
+            self.trigger();
+        }
+    }
+
     pub(crate) fn kill(&self) {
         self.trigger();
+        // Acquire the lock to make sure the daemon thread either has been
+        // waiting on the condition, or has not checked `keep_running` yet.
+        let _ = self.current_snapshot.0.lock();
+        self.current_snapshot.1.notify_all();
     }
 }
 
 impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
+    pub fn save_snapshot(&self, snapshot: Snapshot) {
+        self.snapshot_daemon.save_snapshot(snapshot)
+    }
+
     pub(crate) fn run_snapshot_daemon(
         &mut self,
         max_state_size: Option<usize>,
@@ -53,6 +81,7 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
         let keep_running = self.keep_running.clone();
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
+        let snapshot_daemon = self.snapshot_daemon.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
         std::thread::spawn(move || loop {
@@ -62,12 +91,23 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                 drop(keep_running);
                 drop(rf);
                 drop(persister);
+                drop(snapshot_daemon);
                 drop(stop_wait_group);
                 break;
             }
             if persister.state_size() >= max_state_size {
                 let log_start = rf.lock().log.first_index_term();
-                let snapshot = request_snapshot(log_start.index + 1);
+                let snapshot = {
+                    let mut snapshot =
+                        snapshot_daemon.current_snapshot.0.lock();
+                    if keep_running.load(Ordering::SeqCst)
+                        && snapshot.last_included_index <= log_start.index
+                    {
+                        request_snapshot(log_start.index + 1);
+                        snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
+                    }
+                    snapshot.clone()
+                };
 
                 let mut rf = rf.lock();
                 if rf.log.first_index_term() != log_start {