Przeglądaj źródła

Replace parker with a condvar.

1. The creation API is much simpler.
2. We can afford to lose wakeups, so condvar is fine.
Jing Yang 3 lat temu
rodzic
commit
79778e383c
1 zmienionych plików z 14 dodań i 16 usunięć
  1. 14 16
      src/verify_authority.rs

+ 14 - 16
src/verify_authority.rs

@@ -1,8 +1,7 @@
 use crate::beat_ticker::{Beat, SharedBeatTicker};
 use crate::daemon_env::Daemon;
 use crate::{Index, Raft, Term, HEARTBEAT_INTERVAL_MILLIS};
-use crossbeam_utils::sync::{Parker, Unparker};
-use parking_lot::Mutex;
+use parking_lot::{Condvar, Mutex};
 use std::collections::VecDeque;
 use std::future::Future;
 use std::sync::atomic::Ordering;
@@ -77,7 +76,7 @@ impl VerifyAuthorityState {
 #[derive(Clone)]
 pub(crate) struct DaemonBeatTicker {
     beat_ticker: SharedBeatTicker,
-    unparker: Unparker,
+    condvar: Arc<Condvar>,
 }
 
 impl DaemonBeatTicker {
@@ -88,7 +87,7 @@ impl DaemonBeatTicker {
 
     pub fn tick(&self, beat: Beat) {
         self.beat_ticker.tick(beat);
-        self.unparker.unpark();
+        self.condvar.notify_one();
     }
 }
 
@@ -96,7 +95,7 @@ impl DaemonBeatTicker {
 pub(crate) struct VerifyAuthorityDaemon {
     state: Arc<Mutex<VerifyAuthorityState>>,
     beat_tickers: Vec<SharedBeatTicker>,
-    unparker: Option<Unparker>,
+    condvar: Arc<Condvar>,
 }
 
 impl VerifyAuthorityDaemon {
@@ -108,10 +107,15 @@ impl VerifyAuthorityDaemon {
             beat_tickers: (0..peer_count)
                 .map(|_| SharedBeatTicker::create())
                 .collect(),
-            unparker: None,
+            condvar: Arc::new(Condvar::new()),
         }
     }
 
+    pub fn wait_for(&self, timeout: Duration) {
+        let mut guard = self.state.lock();
+        self.condvar.wait_for(&mut guard, timeout);
+    }
+
     pub fn reset_state(&self, term: Term) {
         self.state.lock().reset(term);
         // Increase all beats by one to make sure upcoming verify authority
@@ -328,14 +332,12 @@ impl VerifyAuthorityDaemon {
     pub fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
         DaemonBeatTicker {
             beat_ticker: self.beat_tickers[peer_index].clone(),
-            unparker: self.unparker.clone().unwrap(),
+            condvar: self.condvar.clone(),
         }
     }
 
     pub fn kill(&self) {
-        if let Some(unparker) = self.unparker.as_ref() {
-            unparker.unpark();
-        }
+        self.condvar.notify_all();
     }
 }
 
@@ -343,11 +345,7 @@ impl<Command: 'static + Send> Raft<Command> {
     const BEAT_RECORDING_MAX_PAUSE: Duration = Duration::from_millis(20);
 
     /// Create a thread and runs the verify authority daemon.
-    pub(crate) fn run_verify_authority_daemon(&mut self) {
-        let parker = Parker::new();
-        let unparker = parker.unparker().clone();
-        self.verify_authority_daemon.unparker.replace(unparker);
-
+    pub(crate) fn run_verify_authority_daemon(&self) {
         let me = self.me.clone();
         let keep_running = self.keep_running.clone();
         let daemon_env = self.daemon_env.clone();
@@ -361,7 +359,7 @@ impl<Command: 'static + Send> Raft<Command> {
 
             log::info!("{:?} verify authority daemon running ...", me);
             while keep_running.load(Ordering::Acquire) {
-                parker.park_timeout(Self::BEAT_RECORDING_MAX_PAUSE);
+                this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
                 let (current_term, commit_index, sentinel) = {
                     let rf = rf.lock();
                     (rf.current_term, rf.commit_index, rf.sentinel_commit_index)