Przeglądaj źródła

Use one heartbeat task instead of N.

Jing Yang 3 lat temu
rodzic
commit
9bf70da709
1 zmienionych plików z 20 dodań i 21 usunięć
  1. 20 21
      src/heartbeats.rs

+ 20 - 21
src/heartbeats.rs

@@ -67,29 +67,28 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// The request message is a stripped down version of `AppendEntries`. The
     /// response from the peer is ignored.
     pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
-        for peer in self.peers.clone().into_iter() {
-            if peer != self.me {
-                // rf is now owned by the outer async function.
-                let rf = self.inner_state.clone();
-                // A on-demand trigger to sending a heartbeat.
-                let mut trigger = self.heartbeats_daemon.sender.subscribe();
-                // Shutdown signal.
-                let keep_running = self.keep_running.clone();
-                self.thread_pool.spawn(async move {
-                    let mut interval = tokio::time::interval(interval);
-                    while keep_running.load(Ordering::Relaxed) {
-                        let tick = interval.tick();
-                        let trigger = trigger.recv();
-                        futures_util::pin_mut!(tick, trigger);
-                        let _ =
-                            futures_util::future::select(tick, trigger).await;
-                        if let Some(args) = Self::build_heartbeat(&rf) {
-                            tokio::spawn(Self::send_heartbeat(peer, args));
-                        }
+        // rf is now owned by the outer async function.
+        let rf = self.inner_state.clone();
+        // A on-demand trigger to sending a heartbeat.
+        let mut trigger = self.heartbeats_daemon.sender.subscribe();
+        // Shutdown signal.
+        let keep_running = self.keep_running.clone();
+        let peers = self.peers.clone();
+
+        self.thread_pool.spawn(async move {
+            let mut interval = tokio::time::interval(interval);
+            while keep_running.load(Ordering::Relaxed) {
+                let tick = interval.tick();
+                let trigger = trigger.recv();
+                futures_util::pin_mut!(tick, trigger);
+                let _ = futures_util::future::select(tick, trigger).await;
+                if let Some(args) = Self::build_heartbeat(&rf) {
+                    for peer in &peers {
+                        tokio::spawn(Self::send_heartbeat(*peer, args.clone()));
                     }
-                });
+                }
             }
-        }
+        });
     }
 
     fn build_heartbeat(