瀏覽代碼

Half implemented snapshot.

Jing Yang 5 年之前
父節點
當前提交
951032231d
共有 2 個文件被更改,包括 78 次插入0 次删除
  1. 4 0
      src/lib.rs
  2. 74 0
      src/snapshot.rs

+ 4 - 0
src/lib.rs

@@ -22,6 +22,7 @@ pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
 pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
+use crate::snapshot::SnapshotDaemon;
 use crate::utils::retry_rpc;
 
 mod index_term;
@@ -30,6 +31,7 @@ mod log_array;
 mod persister;
 mod raft_state;
 pub mod rpcs;
+mod snapshot;
 pub mod utils;
 
 #[derive(
@@ -69,6 +71,7 @@ pub struct Raft<Command> {
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
     election: Arc<ElectionState>,
+    snapshot_daemon: Arc<SnapshotDaemon>,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 
@@ -182,6 +185,7 @@ where
             apply_command_signal: Arc::new(Default::default()),
             keep_running: Arc::new(Default::default()),
             election: Arc::new(election),
+            snapshot_daemon: Default::default(),
             thread_pool: Arc::new(thread_pool),
             stop_wait_group: WaitGroup::new(),
         };

+ 74 - 0
src/snapshot.rs

@@ -0,0 +1,74 @@
+use crate::{Index, Raft};
+use crossbeam_utils::sync::{Parker, Unparker};
+
+pub struct Snapshot {
+    pub data: Vec<u8>,
+    pub last_included_index: Index,
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct SnapshotDaemon {
+    unparker: Option<Unparker>,
+}
+
+impl SnapshotDaemon {
+    pub(crate) fn trigger_snapshot(&self) {
+        match self.unparker {
+            Some(&unparker) => unparker.unpark(),
+            None => {}
+        }
+    }
+}
+
+impl<C: 'static + Default + Send> Raft<C> {
+    pub(crate) fn run_snapshot_daemon<Func>(
+        &self,
+        max_state_size: Option<usize>,
+        mut request_snapshot: Func,
+    ) where
+        Func: 'static + Send + FnMut(Index) -> Snapshot,
+    {
+        let max_state_size = match max_state_size {
+            Some(max_state_size) => max_state_size,
+            None => return,
+        };
+
+        let parker = Parker::new();
+        let unparker = parker.unparker();
+        self.snapshot_daemon.unparker.replace(unparker.clone());
+
+        let rf = self.inner_state.clone();
+        let persister = self.persister.clone();
+
+        std::thread::spawn(move || loop {
+            parker.park();
+            if current_state_size >= max_state_size {
+                let (term, log_start) = {
+                    let rf = rf.lock();
+                    (rf.current_term, rf.log.first_index_term())
+                };
+                let snapshot = request_snapshot(log_start.index + 1);
+
+                let mut rf = rf.lock();
+                if rf.current_term != term
+                    || rf.log.first_index_term() != log_start
+                {
+                    // Term has changed, or another snapshot was installed.
+                    unparker.unpark();
+                    continue;
+                }
+                if snapshot.last_included_index <= rf.log.start()
+                    || snapshot.last_included_index >= rf.log.end()
+                {
+                    // TODO(ditsing): Something happened.
+                    unparker.unpark();
+                    continue;
+                }
+
+                rf.log.shift(snapshot.last_included_index, snapshot.data);
+                // TOOD(ditsing): fix.
+                // this.persisted_state(rf.persisted_state());
+            }
+        });
+    }
+}