Browse Source

Create an explicit enum for sync log entry events.

Jing Yang 3 năm trước cách đây
mục cha
commit
2f66c3fa08
3 tập tin đã thay đổi với 32 bổ sung14 xóa
  1. 1 1
      src/election.rs
  2. 1 1
      src/raft.rs
  3. 30 12
      src/sync_log_entries.rs

+ 1 - 1
src/election.rs

@@ -381,7 +381,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon.reset_state(term, sentinel_commit_index);
 
             // Sync all logs now.
-            new_log_entry.update_followers();
+            new_log_entry.update_followers(sentinel_commit_index);
         }
     }
 }

+ 1 - 1
src/raft.rs

@@ -162,7 +162,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         self.persister.save_state(rf.persisted_state().into());
 
         // Several attempts have been made to remove the unwrap below.
-        let _ = self.sync_log_entries_comms.update_followers();
+        let _ = self.sync_log_entries_comms.update_followers(index);
 
         log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
         Some(IndexTerm::pack(index, term))

+ 30 - 12
src/sync_log_entries.rs

@@ -9,39 +9,58 @@ use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
 use crate::{
-    check_or_record, AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer,
-    Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
+    check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
+    Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
 };
 
+#[derive(Clone)]
+enum Event {
+    NewTerm(Term),
+    NewLogEntry(Index),
+    Rerun(Peer),
+    Shutdown,
+}
+
+impl Event {
+    fn should_schedule(&self, peer: Peer) -> bool {
+        match self {
+            Event::NewTerm(_term) => true,
+            Event::NewLogEntry(_index) => true,
+            Event::Rerun(p) => p == &peer,
+            Event::Shutdown => false,
+        }
+    }
+}
+
 #[derive(Clone)]
 pub(crate) struct SyncLogEntriesComms {
-    tx: crate::utils::SharedSender<Option<Peer>>,
+    tx: crate::utils::SharedSender<Event>,
 }
 
 impl SyncLogEntriesComms {
-    pub fn update_followers(&self) {
+    pub fn update_followers(&self, index: Index) {
         // Ignore the error. The log syncing thread must have died.
-        let _ = self.tx.send(None);
+        let _ = self.tx.send(Event::NewLogEntry(index));
     }
 
     pub fn kill(&self) {
         self.tx
-            .send(None)
+            .send(Event::Shutdown)
             .expect("The sync log entries daemon should still be alive");
     }
 
     fn rerun(&self, peer_index: usize) {
         // Ignore the error. The log syncing thread must have died.
-        let _ = self.tx.send(Some(Peer(peer_index)));
+        let _ = self.tx.send(Event::Rerun(Peer(peer_index)));
     }
 }
 
 pub(crate) struct SyncLogEntriesDaemon {
-    rx: std::sync::mpsc::Receiver<Option<Peer>>,
+    rx: std::sync::mpsc::Receiver<Event>,
 }
 
 pub(crate) fn create() -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
-    let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+    let (tx, rx) = std::sync::mpsc::channel();
     let tx = SharedSender::new(tx);
     (SyncLogEntriesComms { tx }, SyncLogEntriesDaemon { rx })
 }
@@ -103,7 +122,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             let openings = openings; // Not mutable beyond this point.
 
             let mut task_number = 0;
-            while let Ok(peer) = rx.recv() {
+            while let Ok(event) = rx.recv() {
                 if !this.keep_running.load(Ordering::Relaxed) {
                     break;
                 }
@@ -111,8 +130,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     continue;
                 }
                 for (i, rpc_client) in this.peers.iter().enumerate() {
-                    if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
-                    {
+                    if i != this.me.0 && event.should_schedule(Peer(i)) {
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
                         if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {