Kaynağa Gözat

Implement the blocking mechanism using parker.

Jing Yang 5 yıl önce
ebeveyn
işleme
6e2b80dc96
2 değiştirilmiş dosya ile 16 ekleme ve 35 silme
  1. 1 1
      src/lib.rs
  2. 15 34
      src/snapshot.rs

+ 1 - 1
src/lib.rs

@@ -71,7 +71,7 @@ pub struct Raft<Command> {
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
     election: Arc<ElectionState>,
-    snapshot_daemon: Arc<SnapshotDaemon>,
+    snapshot_daemon: SnapshotDaemon,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 

+ 15 - 34
src/snapshot.rs

@@ -1,50 +1,28 @@
 use crate::{Index, Raft};
-use parking_lot::{Condvar, Mutex};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use crossbeam_utils::sync::{Parker, Unparker};
 
 pub struct Snapshot {
     pub last_included_index: Index,
     pub data: Vec<u8>,
 }
 
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 pub(crate) struct SnapshotDaemon {
-    count: AtomicUsize,
-    mutex: Mutex<usize>,
-    cond: Condvar,
+    unparker: Option<Unparker>,
 }
 
 impl SnapshotDaemon {
-    pub(crate) fn trigger_snapshot_soft(&self) -> usize {
-        self.count.fetch_add(1, Ordering::SeqCst)
-    }
-
-    pub(crate) fn trigger_snapshot(&self) -> usize {
-        let prev = *self.mutex.lock();
-        let curr = self.trigger_snapshot_soft();
-        self.cond.notify_one();
-        assert!(curr >= prev);
-        assert_ne!(curr, usize::MAX, "The counter overflowed.");
-        curr
-    }
-
-    fn wait_for_triggering(&self) -> (usize, usize) {
-        let mut guard = self.mutex.lock();
-        loop {
-            let curr = self.count.load(Ordering::SeqCst);
-            if curr != *guard {
-                let prev = *guard;
-                *guard = curr;
-                return (prev, curr);
-            }
-            self.cond.wait(&mut guard);
+    pub(crate) fn trigger(&self) {
+        match &self.unparker {
+            Some(unparker) => unparker.unpark(),
+            None => {}
         }
     }
 }
 
 impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
     pub(crate) fn run_snapshot_daemon<Func>(
-        &self,
+        &mut self,
         max_state_size: Option<usize>,
         mut request_snapshot: Func,
     ) where
@@ -55,12 +33,15 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
             None => return,
         };
 
+        let parker = Parker::new();
+        let unparker = parker.unparker().clone();
+        self.snapshot_daemon.unparker.replace(unparker.clone());
+
         let rf = self.inner_state.clone();
-        let snapshot_daemon = self.snapshot_daemon.clone();
         let persister = self.persister.clone();
 
         std::thread::spawn(move || loop {
-            snapshot_daemon.wait_for_triggering();
+            parker.park();
             if persister.state_size() >= max_state_size {
                 let (term, log_start) = {
                     let rf = rf.lock();
@@ -73,14 +54,14 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                     || rf.log.first_index_term() != log_start
                 {
                     // Term has changed, or another snapshot was installed.
-                    snapshot_daemon.trigger_snapshot_soft();
+                    unparker.unpark();
                     continue;
                 }
                 if snapshot.last_included_index <= rf.log.start()
                     || snapshot.last_included_index >= rf.log.end()
                 {
                     // TODO(ditsing): Something happened.
-                    snapshot_daemon.trigger_snapshot_soft();
+                    unparker.unpark();
                     continue;
                 }