瀏覽代碼

Replace SharedBeatTicker with DaemonBeatTicker.

The new BeatTicker wakes up the daemon after each tick.
Jing Yang 3 年之前
父節點
當前提交
466c1d4d88
共有 3 個文件被更改,包括 30 次插入9 次删除
  1. 2 2
      src/heartbeats.rs
  2. 4 4
      src/sync_log_entries.rs
  3. 24 3
      src/verify_authority.rs

+ 2 - 2
src/heartbeats.rs

@@ -3,9 +3,9 @@ use std::time::Duration;
 
 use parking_lot::Mutex;
 
-use crate::beat_ticker::SharedBeatTicker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
+use crate::verify_authority::DaemonBeatTicker;
 use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
 #[derive(Clone)]
@@ -114,7 +114,7 @@ where
         rpc_client: impl RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
         term_watermark: TermMarker<Command>,
-        beat_ticker: SharedBeatTicker,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<()> {
         let term = args.term;
         let beat = beat_ticker.next_beat();

+ 4 - 4
src/sync_log_entries.rs

@@ -4,12 +4,12 @@ use std::time::Duration;
 
 use parking_lot::{Condvar, Mutex};
 
-use crate::beat_ticker::SharedBeatTicker;
 use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::verify_authority::DaemonBeatTicker;
 use crate::{
     AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
     Term, HEARTBEAT_INTERVAL_MILLIS,
@@ -170,7 +170,7 @@ where
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
-        beat_ticker: SharedBeatTicker,
+        beat_ticker: DaemonBeatTicker,
         task_number: TaskNumber,
     ) {
         if opening.swap(0, Ordering::SeqCst) == 0 {
@@ -458,7 +458,7 @@ where
     async fn append_entries(
         rpc_client: &dyn RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
-        beat_ticker: SharedBeatTicker,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
         let beat = beat_ticker.next_beat();
@@ -501,7 +501,7 @@ where
     async fn install_snapshot(
         rpc_client: &dyn RemoteRaft<Command>,
         args: InstallSnapshotArgs,
-        beat_ticker: SharedBeatTicker,
+        beat_ticker: DaemonBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
         let beat = beat_ticker.next_beat();

+ 24 - 3
src/verify_authority.rs

@@ -74,6 +74,24 @@ impl VerifyAuthorityState {
     }
 }
 
+#[derive(Clone)]
+pub(crate) struct DaemonBeatTicker {
+    beat_ticker: SharedBeatTicker,
+    unparker: Unparker,
+}
+
+impl DaemonBeatTicker {
+    pub fn next_beat(&self) -> Beat {
+        let beat = self.beat_ticker.next_beat();
+        beat
+    }
+
+    pub fn tick(&self, beat: Beat) {
+        self.beat_ticker.tick(beat);
+        self.unparker.unpark();
+    }
+}
+
 #[derive(Clone)]
 pub(crate) struct VerifyAuthorityDaemon {
     state: Arc<Mutex<VerifyAuthorityState>>,
@@ -307,8 +325,11 @@ impl VerifyAuthorityDaemon {
         }
     }
 
-    pub fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
-        self.beat_tickers[peer_index].clone()
+    pub fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
+        DaemonBeatTicker {
+            beat_ticker: self.beat_tickers[peer_index].clone(),
+            unparker: self.unparker.clone().unwrap(),
+        }
     }
 
     pub fn kill(&self) {
@@ -395,7 +416,7 @@ impl<Command: 'static + Send> Raft<Command> {
         })
     }
 
-    pub(crate) fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
+    pub(crate) fn beat_ticker(&self, peer_index: usize) -> DaemonBeatTicker {
         self.verify_authority_daemon.beat_ticker(peer_index)
     }
 }