Преглед на файлове

Wrap data of the sync log entries daemon in opaque structs.

Jing Yang преди 3 години
родител
ревизия
0b29f77341
променени са 3 файла, в които са добавени 56 реда и са изтрити 20 реда
  1. 5 4
      src/election.rs
  2. 8 5
      src/raft.rs
  3. 43 11
      src/sync_log_entries.rs

+ 5 - 4
src/election.rs

@@ -6,8 +6,9 @@ use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::daemon_env::Daemon;
+use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::term_marker::TermMarker;
-use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
     Peer, Persister, Raft, RaftState, RemoteRaft, ReplicableCommand,
@@ -270,7 +271,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             votes,
             rx,
             self.election.clone(),
-            self.new_log_entry.clone().unwrap(),
+            self.sync_log_entries_comms.clone(),
             self.verify_authority_daemon.clone(),
             self.persister.clone(),
         ));
@@ -306,7 +307,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
-        new_log_entry: SharedSender<Option<Peer>>,
+        new_log_entry: SyncLogEntriesComms,
         verify_authority_daemon: VerifyAuthorityDaemon,
         persister: Arc<dyn Persister>,
     ) {
@@ -380,7 +381,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             verify_authority_daemon.reset_state(term, sentinel_commit_index);
 
             // Sync all logs now.
-            let _ = new_log_entry.send(None);
+            new_log_entry.update_followers();
         }
     }
 }

+ 8 - 5
src/raft.rs

@@ -11,6 +11,7 @@ use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
     utils, IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand,
@@ -33,7 +34,7 @@ pub struct Raft<Command> {
 
     pub(crate) persister: Arc<dyn Persister>,
 
-    pub(crate) new_log_entry: Option<utils::SharedSender<Option<Peer>>>,
+    pub(crate) sync_log_entries_comms: SyncLogEntriesComms,
     pub(crate) apply_command_signal: Arc<Condvar>,
     pub(crate) keep_running: Arc<AtomicBool>,
     pub(crate) election: Arc<ElectionState>,
@@ -103,13 +104,15 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .into_iter()
             .map(|r| Arc::new(r) as Arc<dyn RemoteRaft<Command>>)
             .collect();
+        let (sync_log_entries_comms, sync_log_entries_daemon) =
+            crate::sync_log_entries::create();
 
         let mut this = Raft {
             inner_state: Arc::new(Mutex::new(state)),
             peers,
             me: Peer(me),
             persister,
-            new_log_entry: None,
+            sync_log_entries_comms,
             apply_command_signal: Arc::new(Condvar::new()),
             keep_running: Arc::new(AtomicBool::new(true)),
             election: Arc::new(election),
@@ -125,7 +128,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // Running in a standalone thread.
         this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         // Running in a standalone thread.
-        this.run_log_entry_daemon();
+        this.run_log_entry_daemon(sync_log_entries_daemon);
         // Running in a standalone thread.
         this.run_apply_command_daemon(apply_command);
         // One off function that schedules many little tasks, running on the
@@ -159,7 +162,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         self.persister.save_state(rf.persisted_state().into());
 
         // Several attempts have been made to remove the unwrap below.
-        let _ = self.new_log_entry.as_ref().unwrap().send(None);
+        let _ = self.sync_log_entries_comms.update_followers();
 
         log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
         Some(IndexTerm::pack(index, term))
@@ -173,7 +176,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn kill(mut self) {
         self.keep_running.store(false, Ordering::Release);
         self.election.stop_election_timer();
-        self.new_log_entry.take().map(|n| n.send(None));
+        self.sync_log_entries_comms.kill();
         self.apply_command_signal.notify_all();
         self.snapshot_daemon.kill();
         self.verify_authority_daemon.kill();

+ 43 - 11
src/sync_log_entries.rs

@@ -13,6 +13,39 @@ use crate::{
     Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
 };
 
+#[derive(Clone)]
+pub(crate) struct SyncLogEntriesComms {
+    tx: crate::utils::SharedSender<Option<Peer>>,
+}
+
+impl SyncLogEntriesComms {
+    pub fn update_followers(&self) {
+        // Ignore the error. The log syncing thread must have died.
+        let _ = self.tx.send(None);
+    }
+
+    pub fn kill(&self) {
+        self.tx
+            .send(None)
+            .expect("The sync log entries daemon should still be alive");
+    }
+
+    fn rerun(&self, peer_index: usize) {
+        // Ignore the error. The log syncing thread must have died.
+        let _ = self.tx.send(Some(Peer(peer_index)));
+    }
+}
+
+pub(crate) struct SyncLogEntriesDaemon {
+    rx: std::sync::mpsc::Receiver<Option<Peer>>,
+}
+
+pub(crate) fn create() -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
+    let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+    let tx = SharedSender::new(tx);
+    (SyncLogEntriesComms { tx }, SyncLogEntriesDaemon { rx })
+}
+
 #[repr(align(64))]
 struct Opening(Arc<AtomicUsize>);
 
@@ -54,11 +87,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
     ///
     /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
     /// and backoff strategy.
-    pub(crate) fn run_log_entry_daemon(&mut self) {
-        let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
-        let tx = SharedSender::new(tx);
-        self.new_log_entry.replace(tx);
-
+    pub(crate) fn run_log_entry_daemon(
+        &self,
+        SyncLogEntriesDaemon { rx }: SyncLogEntriesDaemon,
+    ) {
         // Clone everything that the thread needs.
         let this = self.clone();
         let sync_log_entry_daemon = move || {
@@ -89,7 +121,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
                                 i,
-                                this.new_log_entry.clone().unwrap(),
+                                this.sync_log_entries_comms.clone(),
                                 openings[i].0.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
@@ -152,7 +184,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
         peer_index: usize,
-        rerun: SharedSender<Option<Peer>>,
+        comms: SyncLogEntriesComms,
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
@@ -260,7 +292,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 if prev_log_index == match_index {
                     // If we did not make any progress this time, try again.
                     // This can only happen when installing snapshots.
-                    let _ = rerun.send(Some(Peer(peer_index)));
+                    comms.rerun(peer_index);
                 }
             }
             Ok(SyncLogEntriesResult::Archived(committed)) => {
@@ -289,7 +321,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 rf.next_index[peer_index] = committed.index + 1;
 
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer_index);
             }
             Ok(SyncLogEntriesResult::Diverged(committed)) => {
                 let mut rf = rf.lock();
@@ -326,7 +358,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
 
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer_index);
             }
             // Do nothing, not our term anymore.
             Ok(SyncLogEntriesResult::TermElapsed(term)) => {
@@ -335,7 +367,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             Err(_) => {
                 tokio::time::sleep(HEARTBEAT_INTERVAL).await;
                 // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
+                comms.rerun(peer_index);
             }
         };
     }