Преглед изворни кода

Tick beats at successful RPC response.

Jing Yang пре 3 година
родитељ
комит
53f83a8828
3 измењених фајлова са 33 додато и 2 уклоњено
  1. 11 0
      src/heartbeats.rs
  2. 14 2
      src/sync_log_entries.rs
  3. 8 0
      src/verify_authority.rs

+ 11 - 0
src/heartbeats.rs

@@ -3,6 +3,7 @@ use std::time::Duration;
 
 use parking_lot::Mutex;
 
+use crate::beat_ticker::SharedBeatTicker;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
@@ -34,6 +35,9 @@ where
                 let rf = self.inner_state.clone();
                 // A function that updates term with responses to heartbeats.
                 let term_marker = self.term_marker();
+                // A function that casts an "authoritative" vote with Ok()
+                // responses to heartbeats.
+                let beat_ticker = self.beat_ticker(peer_index);
                 // RPC client must be cloned into the outer async function.
                 let rpc_client = rpc_client.clone();
                 // Shutdown signal.
@@ -47,6 +51,7 @@ where
                                 rpc_client.clone(),
                                 args,
                                 term_marker.clone(),
+                                beat_ticker.clone(),
                             ));
                         }
                     }
@@ -87,7 +92,10 @@ where
         rpc_client: impl RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
         term_watermark: TermMarker<Command>,
+        beat_ticker: SharedBeatTicker,
     ) -> std::io::Result<()> {
+        let term = args.term;
+        let beat = beat_ticker.next_beat();
         // Passing a reference that is moved to the following closure.
         //
         // It won't work if the rpc_client of type Arc is moved into the closure
@@ -109,6 +117,9 @@ where
             })
             .await?;
         term_watermark.mark(response.term);
+        if term == response.term {
+            beat_ticker.tick(beat);
+        }
         Ok(())
     }
 }

+ 14 - 2
src/sync_log_entries.rs

@@ -4,6 +4,7 @@ use std::time::Duration;
 
 use parking_lot::{Condvar, Mutex};
 
+use crate::beat_ticker::SharedBeatTicker;
 use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::index_term::IndexTerm;
@@ -101,6 +102,7 @@ where
                                 openings[i].0.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
+                                this.beat_ticker(i),
                                 TaskNumber(task_number),
                             ));
                         }
@@ -168,6 +170,7 @@ where
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
+        beat_ticker: SharedBeatTicker,
         task_number: TaskNumber,
     ) {
         if opening.swap(0, Ordering::SeqCst) == 0 {
@@ -181,7 +184,8 @@ where
                 let term = args.term;
                 let prev_log_index = args.prev_log_index;
                 let match_index = args.prev_log_index + args.entries.len();
-                let succeeded = Self::append_entries(&rpc_client, args).await;
+                let succeeded =
+                    Self::append_entries(&rpc_client, args, beat_ticker).await;
 
                 (term, prev_log_index, match_index, succeeded)
             }
@@ -189,7 +193,9 @@ where
                 let term = args.term;
                 let prev_log_index = args.last_included_index;
                 let match_index = args.last_included_index;
-                let succeeded = Self::install_snapshot(&rpc_client, args).await;
+                let succeeded =
+                    Self::install_snapshot(&rpc_client, args, beat_ticker)
+                        .await;
 
                 (term, prev_log_index, match_index, succeeded)
             }
@@ -452,8 +458,10 @@ where
     async fn append_entries(
         rpc_client: &dyn RemoteRaft<Command>,
         args: AppendEntriesArgs<Command>,
+        beat_ticker: SharedBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
+        let beat = beat_ticker.next_beat();
         let reply = retry_rpc(
             Self::APPEND_ENTRIES_RETRY,
             RPC_DEADLINE,
@@ -461,6 +469,7 @@ where
         )
         .await?;
         Ok(if reply.term == term {
+            beat_ticker.tick(beat);
             if let Some(committed) = reply.committed {
                 if reply.success {
                     SyncLogEntriesResult::Archived(committed)
@@ -492,8 +501,10 @@ where
     async fn install_snapshot(
         rpc_client: &dyn RemoteRaft<Command>,
         args: InstallSnapshotArgs,
+        beat_ticker: SharedBeatTicker,
     ) -> std::io::Result<SyncLogEntriesResult> {
         let term = args.term;
+        let beat = beat_ticker.next_beat();
         let reply = retry_rpc(
             Self::INSTALL_SNAPSHOT_RETRY,
             RPC_DEADLINE,
@@ -501,6 +512,7 @@ where
         )
         .await?;
         Ok(if reply.term == term {
+            beat_ticker.tick(beat);
             if let Some(committed) = reply.committed {
                 SyncLogEntriesResult::Archived(committed)
             } else {

+ 8 - 0
src/verify_authority.rs

@@ -281,6 +281,10 @@ impl VerifyAuthorityDaemon {
         }
     }
 
+    pub fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
+        self.beat_tickers[peer_index].clone()
+    }
+
     pub fn kill(&self) {
         if let Some(unparker) = self.unparker.as_ref() {
             unparker.unpark();
@@ -354,4 +358,8 @@ impl<Command: 'static + Send> Raft<Command> {
                 .expect("Verify authority daemon never drops senders")
         })
     }
+
+    pub(crate) fn beat_ticker(&self, peer_index: usize) -> SharedBeatTicker {
+        self.verify_authority_daemon.beat_ticker(peer_index)
+    }
 }