Explorar o código

Trigger heartbeats when authority needs to be verified.

Jing Yang %!s(int64=3) %!d(string=hai) anos
pai
achega
1c6bbbc54f
Modificáronse 3 ficheiros con 27 adicións e 1 borrados
  1. 23 1
      src/heartbeats.rs
  2. 3 0
      src/lib.rs
  3. 1 0
      src/verify_authority.rs

+ 23 - 1
src/heartbeats.rs

@@ -8,6 +8,22 @@ use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
+#[derive(Clone)]
+pub(crate) struct HeartbeatsDaemon {
+    sender: tokio::sync::broadcast::Sender<()>,
+}
+
+impl HeartbeatsDaemon {
+    pub fn create() -> Self {
+        let (sender, _) = tokio::sync::broadcast::channel(1);
+        Self { sender }
+    }
+
+    pub fn trigger(&self) {
+        let _ = self.sender.send(());
+    }
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -38,6 +54,8 @@ where
                 // A function that casts an "authoritative" vote with Ok()
                 // responses to heartbeats.
                 let beat_ticker = self.beat_ticker(peer_index);
+                // A on-demand trigger to sending a heartbeat.
+                let mut trigger = self.heartbeats_daemon.sender.subscribe();
                 // RPC client must be cloned into the outer async function.
                 let rpc_client = rpc_client.clone();
                 // Shutdown signal.
@@ -45,7 +63,11 @@ where
                 self.thread_pool.spawn(async move {
                     let mut interval = tokio::time::interval(interval);
                     while keep_running.load(Ordering::SeqCst) {
-                        interval.tick().await;
+                        let tick = interval.tick();
+                        let trigger = trigger.recv();
+                        futures_util::pin_mut!(tick, trigger);
+                        let _ =
+                            futures_util::future::select(tick, trigger).await;
                         if let Some(args) = Self::build_heartbeat(&rf) {
                             tokio::spawn(Self::send_heartbeat(
                                 rpc_client.clone(),

+ 3 - 0
src/lib.rs

@@ -11,6 +11,7 @@ use crate::apply_command::ApplyCommandFnMut;
 pub use crate::apply_command::ApplyCommandMessage;
 use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::election::ElectionState;
+use crate::heartbeats::HeartbeatsDaemon;
 use crate::index_term::IndexTerm;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
@@ -71,6 +72,7 @@ pub struct Raft<Command> {
     election: Arc<ElectionState>,
     snapshot_daemon: SnapshotDaemon,
     verify_authority_daemon: VerifyAuthorityDaemon,
+    heartbeats_daemon: HeartbeatsDaemon,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
 
@@ -211,6 +213,7 @@ where
             election: Arc::new(election),
             snapshot_daemon: Default::default(),
             verify_authority_daemon: VerifyAuthorityDaemon::create(peer_size),
+            heartbeats_daemon: HeartbeatsDaemon::create(),
             thread_pool: Arc::new(thread_pool),
             daemon_env,
             stop_wait_group: WaitGroup::new(),

+ 1 - 0
src/verify_authority.rs

@@ -352,6 +352,7 @@ impl<Command: 'static + Send> Raft<Command> {
         let receiver = self
             .verify_authority_daemon
             .verify_authority_async(term, commit_index);
+        self.heartbeats_daemon.trigger();
         receiver.map(|receiver| async move {
             receiver
                 .await