Pārlūkot izejas kodu

Reduce the frequency of heartbeats when there are pending commits.

If there are pending commits which are about to be committed, we can
rely on new commits to prove authority.

This change reduced CPU usage for about 10%, can support a 1.5x rate
of read only requests when the network is good. It does not affect
capacity when the network is unreliable. Overall this is a pure
improvement in all aspects.
Jing Yang 3 gadi atpakaļ
vecāks
revīzija
db0696517e
2 mainītis faili ar 37 papildinājumiem un 8 dzēšanām
  1. 29 5
      src/heartbeats.rs
  2. 8 3
      src/verify_authority.rs

+ 29 - 5
src/heartbeats.rs

@@ -1,5 +1,6 @@
-use std::sync::atomic::Ordering;
-use std::time::Duration;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
 
 use parking_lot::Mutex;
 
@@ -10,17 +11,40 @@ use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
 
 #[derive(Clone)]
 pub(crate) struct HeartbeatsDaemon {
+    start: Instant,
+    last_trigger: Arc<AtomicU64>,
     sender: tokio::sync::broadcast::Sender<()>,
 }
 
 impl HeartbeatsDaemon {
+    const HEARTBEAT_MAX_DELAY_MILLIS: u64 = 30;
+
     pub fn create() -> Self {
         let (sender, _) = tokio::sync::broadcast::channel(1);
-        Self { sender }
+        Self {
+            start: Instant::now(),
+            last_trigger: Arc::new(AtomicU64::new(0)),
+            sender,
+        }
     }
 
-    pub fn trigger(&self) {
-        let _ = self.sender.send(());
+    pub fn trigger(&self, force: bool) {
+        let now = self.start.elapsed().as_millis();
+        // u64 is big enough for more than 500 million years.
+        let now_lower_bits = (now & (u64::MAX) as u128) as u64;
+        let last_trigger = self.last_trigger.load(Ordering::Acquire);
+        let next_trigger =
+            last_trigger.wrapping_add(Self::HEARTBEAT_MAX_DELAY_MILLIS);
+
+        // Do not trigger heartbeats too frequently, unless we are forced.
+        if force || next_trigger < now_lower_bits {
+            let previous_trigger = self
+                .last_trigger
+                .fetch_max(now_lower_bits, Ordering::AcqRel);
+            if last_trigger == previous_trigger {
+                let _ = self.sender.send(());
+            }
+        }
     }
 }
 

+ 8 - 3
src/verify_authority.rs

@@ -425,7 +425,7 @@ impl<Command: 'static + Send> Raft<Command> {
             return None;
         }
 
-        let (term, commit_index) = {
+        let (term, commit_index, last_index) = {
             let rf = self.inner_state.lock();
             if !rf.is_leader() {
                 // Returning none instead of `Pending::Ready(TermElapsed)`,
@@ -434,12 +434,17 @@ impl<Command: 'static + Send> Raft<Command> {
                 return None;
             }
 
-            (rf.current_term, rf.commit_index)
+            (
+                rf.current_term,
+                rf.commit_index,
+                rf.log.last_index_term().index,
+            )
         };
         let receiver = self
             .verify_authority_daemon
             .verify_authority_async(term, commit_index);
-        self.heartbeats_daemon.trigger();
+        let force_heartbeat = commit_index == last_index;
+        self.heartbeats_daemon.trigger(force_heartbeat);
         receiver.map(|receiver| async move {
             receiver
                 .await