Преглед изворни кода

Add the apply command interface.

Jing Yang пре 5 година
родитељ
комит
0eb81c7d23
1 измењених фајлова са 49 додато и 4 уклоњено
  1. 49 4
      src/lib.rs

+ 49 - 4
src/lib.rs

@@ -13,7 +13,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
-use parking_lot::Mutex;
+use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::rpcs::RpcClient;
@@ -96,7 +96,7 @@ struct Raft {
     me: Peer,
 
     new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
-    // apply_command_cond: Condvar
+    apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
     // applyCh: Sender<ApplyMsg>
 }
@@ -240,7 +240,7 @@ impl Raft {
             } else {
                 rf.log.len() - 1
             };
-            // TODO: apply commands.
+            self.apply_command_signal.notify_one();
         }
 
         AppendEntriesReply {
@@ -455,6 +455,7 @@ impl Raft {
         let rf = self.inner_state.clone();
         let me = self.me;
         let keep_running = self.keep_running.clone();
+        let apply_command_signal = self.apply_command_signal.clone();
         let handle = std::thread::spawn(move || {
             while let Ok(peer) = rx.recv() {
                 for (i, rpc_client) in peers.iter().enumerate() {
@@ -464,6 +465,7 @@ impl Raft {
                             rpc_client.clone(),
                             i,
                             rerun.clone(),
+                            apply_command_signal.clone(),
                         ));
                     }
                 }
@@ -481,6 +483,7 @@ impl Raft {
         rpc_client: RpcClient,
         peer_index: usize,
         rerun: std::sync::mpsc::Sender<Option<Peer>>,
+        apply_command_signal: Arc<Condvar>,
     ) {
         // TODO: cancel in flight changes?
         let args = Self::build_append_entries(&rf, peer_index);
@@ -505,7 +508,7 @@ impl Raft {
                                     == rf.current_term
                             {
                                 rf.commit_index = new_commit_index;
-                                // TODO: apply command.
+                                apply_command_signal.notify_one();
                             }
                         }
                     }
@@ -570,6 +573,48 @@ impl Raft {
             None
         })
     }
+
+    fn run_apply_command_daemon<Func>(
+        &self,
+        mut apply_command: Func,
+    ) -> std::thread::JoinHandle<()>
+    where
+        Func: 'static + Send + FnMut(usize, Command) -> (),
+    {
+        let keep_running = self.keep_running.clone();
+        let rf = self.inner_state.clone();
+        let condvar = self.apply_command_signal.clone();
+        std::thread::spawn(move || {
+            while keep_running.load(Ordering::SeqCst) {
+                let (mut index, commands) = {
+                    let mut rf = rf.lock();
+                    if rf.last_applied >= rf.commit_index {
+                        condvar.wait_for(
+                            &mut rf,
+                            Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
+                        );
+                    }
+                    if rf.last_applied < rf.commit_index {
+                        rf.last_applied += 1;
+                        let index = rf.last_applied;
+                        let commands: Vec<Command> = rf.log[index..]
+                            .iter()
+                            .map(|entry| entry.command)
+                            .collect();
+                        (index, commands)
+                    } else {
+                        continue;
+                    }
+                };
+
+                // Release the lock while calling external functions.
+                for command in commands {
+                    apply_command(index, command);
+                    index += 1;
+                }
+            }
+        })
+    }
 }
 
 const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;