Ver Fonte

Sync log entries when commited.

The raft struct now holds a log entry syncing channel, which is
unfortunately not Send or Sync, only Clone.
Jing Yang há 5 anos atrás
pai
commit
ded4bce012
1 ficheiros alterados com 13 adições e 11 exclusões
  1. 13 11
      src/lib.rs

+ 13 - 11
src/lib.rs

@@ -95,8 +95,7 @@ struct Raft {
 
     me: Peer,
 
-    // new_log_entry: Sender<usize>,
-    // new_log_entry: Receiver<usize>,
+    new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
     // apply_command_cond: Condvar
     keep_running: Arc<AtomicBool>,
     // applyCh: Sender<ApplyMsg>
@@ -303,6 +302,7 @@ impl Raft {
             votes,
             self.peers.len() / 2,
             cancel_token,
+            self.new_log_entry.clone().unwrap(),
         ));
     }
 
@@ -328,6 +328,7 @@ impl Raft {
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         majority: usize,
         cancel_token: futures::channel::oneshot::Receiver<Term>,
+        new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
     ) {
         let mut vote_count = 0;
         let mut against_count = 0;
@@ -370,7 +371,12 @@ impl Raft {
             for item in rf.match_index.iter_mut() {
                 *item = 0;
             }
-            // TODO: send heartbeats.
+            for item in rf.current_step.iter_mut() {
+                *item = 0;
+            }
+            // Sync all logs now.
+            new_log_entry.send(None);
+
             // Drop the timer and cancel token.
             rf.election_cancel_token.take();
             rf.election_timer.take();
@@ -439,16 +445,12 @@ impl Raft {
         Ok(())
     }
 
-    fn run_log_entry_daemon(
-        &self,
-    ) -> (
-        std::thread::JoinHandle<()>,
-        std::sync::mpsc::Sender<Option<Peer>>,
-    ) {
+    fn run_log_entry_daemon(&mut self) -> std::thread::JoinHandle<()> {
         let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+        self.new_log_entry.replace(tx.clone());
 
         // Clone everything that the thread needs.
-        let rerun = tx.clone();
+        let rerun = tx;
         let peers = self.peers.clone();
         let rf = self.inner_state.clone();
         let me = self.me;
@@ -471,7 +473,7 @@ impl Raft {
             }
         });
 
-        (handle, tx)
+        handle
     }
 
     async fn sync_log_entry(