Преглед изворни кода

Integrate snapshot daemon into the main lib.

Jing Yang пре 5 година
родитељ
комит
6ce8d929e8
2 измењених фајлова са 13 додато и 1 уклоњено
  1. 6 1
      src/lib.rs
  2. 7 0
      src/snapshot.rs

+ 6 - 1
src/lib.rs

@@ -22,7 +22,7 @@ pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
 pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
-use crate::snapshot::SnapshotDaemon;
+use crate::snapshot::{Snapshot, SnapshotDaemon};
 use crate::utils::retry_rpc;
 
 mod index_term;
@@ -202,6 +202,10 @@ where
         ));
         // The last step is to start running election timer.
         this.run_election_timer();
+        this.run_snapshot_daemon(Some(1 << 20), |index| Snapshot {
+            last_included_index: index,
+            data: vec![],
+        });
         this
     }
 }
@@ -900,6 +904,7 @@ where
         self.election.stop_election_timer();
         self.new_log_entry.take().map(|n| n.send(None));
         self.apply_command_signal.notify_all();
+        self.snapshot_daemon.trigger();
         self.stop_wait_group.wait();
         std::sync::Arc::try_unwrap(self.thread_pool)
             .expect(

+ 7 - 0
src/snapshot.rs

@@ -1,5 +1,6 @@
 use crate::{Index, Raft};
 use crossbeam_utils::sync::{Parker, Unparker};
+use std::sync::atomic::Ordering;
 
 pub struct Snapshot {
     pub last_included_index: Index,
@@ -37,11 +38,17 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
         let unparker = parker.unparker().clone();
         self.snapshot_daemon.unparker.replace(unparker.clone());
 
+        let keep_running = self.keep_running.clone();
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
+        let stop_wait_group = self.stop_wait_group.clone();
 
         std::thread::spawn(move || loop {
             parker.park();
+            if !keep_running.load(Ordering::SeqCst) {
+                drop(stop_wait_group);
+                break;
+            }
             if persister.state_size() >= max_state_size {
                 let (term, log_start) = {
                     let rf = rf.lock();