Browse Source

Move apply_command to its own module.

Jing Yang 4 years ago
parent
commit
5af7e3b05e
2 changed files with 71 additions and 54 deletions
  1. 66 0
      src/apply_command.rs
  2. 5 54
      src/lib.rs

+ 66 - 0
src/apply_command.rs

@@ -0,0 +1,66 @@
+use std::sync::atomic::Ordering;
+use std::time::Duration;
+
+use crate::{Index, Raft, HEARTBEAT_INTERVAL_MILLIS};
+
+pub trait ApplyCommandFnMut<Command>:
+    'static + Send + FnMut(Index, Command)
+{
+}
+
+impl<Command, T: 'static + Send + FnMut(Index, Command)>
+    ApplyCommandFnMut<Command> for T
+{
+}
+
+impl<Command> Raft<Command>
+where
+    Command: 'static + Clone + Send,
+{
+    pub(crate) fn run_apply_command_daemon(
+        &self,
+        mut apply_command: impl ApplyCommandFnMut<Command>,
+    ) -> std::thread::JoinHandle<()> {
+        let keep_running = self.keep_running.clone();
+        let rf = self.inner_state.clone();
+        let condvar = self.apply_command_signal.clone();
+        let snapshot_daemon = self.snapshot_daemon.clone();
+        let stop_wait_group = self.stop_wait_group.clone();
+        std::thread::spawn(move || {
+            while keep_running.load(Ordering::SeqCst) {
+                let (mut index, commands) = {
+                    let mut rf = rf.lock();
+                    if rf.last_applied >= rf.commit_index {
+                        condvar.wait_for(
+                            &mut rf,
+                            Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
+                        );
+                    }
+                    if rf.last_applied < rf.commit_index {
+                        let index = rf.last_applied + 1;
+                        let last_one = rf.commit_index + 1;
+                        let commands: Vec<Command> = rf
+                            .log
+                            .between(index, last_one)
+                            .iter()
+                            .map(|entry| entry.command.clone())
+                            .collect();
+                        rf.last_applied = rf.commit_index;
+                        (index, commands)
+                    } else {
+                        continue;
+                    }
+                };
+
+                // Release the lock while calling external functions.
+                for command in commands {
+                    apply_command(index, command);
+                    snapshot_daemon.trigger();
+                    index += 1;
+                }
+            }
+
+            drop(stop_wait_group);
+        })
+    }
+}

+ 5 - 54
src/lib.rs

@@ -16,6 +16,7 @@ use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
+use crate::apply_command::ApplyCommandFnMut;
 use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
@@ -26,6 +27,7 @@ pub use crate::snapshot::Snapshot;
 use crate::snapshot::SnapshotDaemon;
 use crate::utils::retry_rpc;
 
+mod apply_command;
 mod index_term;
 mod install_snapshot;
 mod log_array;
@@ -131,16 +133,15 @@ where
     ///
     /// Each instance will create at least 3 + (number of peers) threads. The
     /// extensive usage of threads is to minimize latency.
-    pub fn new<ApplyCommandFunc, RequestSnapshotFunc>(
+    pub fn new<RequestSnapshotFunc>(
         peers: Vec<RpcClient>,
         me: usize,
         persister: Arc<dyn Persister>,
-        apply_command: ApplyCommandFunc,
+        apply_command: impl ApplyCommandFnMut<Command>,
         max_state_size_bytes: Option<usize>,
         request_snapshot: RequestSnapshotFunc,
     ) -> Self
     where
-        ApplyCommandFunc: 'static + Send + FnMut(Index, Command),
         RequestSnapshotFunc: 'static + Send + FnMut(Index) -> Snapshot,
     {
         let peer_size = peers.len();
@@ -836,56 +837,6 @@ where
         })
     }
 
-    fn run_apply_command_daemon<Func>(
-        &self,
-        mut apply_command: Func,
-    ) -> std::thread::JoinHandle<()>
-    where
-        Func: 'static + Send + FnMut(Index, Command),
-    {
-        let keep_running = self.keep_running.clone();
-        let rf = self.inner_state.clone();
-        let condvar = self.apply_command_signal.clone();
-        let snapshot_daemon = self.snapshot_daemon.clone();
-        let stop_wait_group = self.stop_wait_group.clone();
-        std::thread::spawn(move || {
-            while keep_running.load(Ordering::SeqCst) {
-                let (mut index, commands) = {
-                    let mut rf = rf.lock();
-                    if rf.last_applied >= rf.commit_index {
-                        condvar.wait_for(
-                            &mut rf,
-                            Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
-                        );
-                    }
-                    if rf.last_applied < rf.commit_index {
-                        let index = rf.last_applied + 1;
-                        let last_one = rf.commit_index + 1;
-                        let commands: Vec<Command> = rf
-                            .log
-                            .between(index, last_one)
-                            .iter()
-                            .map(|entry| entry.command.clone())
-                            .collect();
-                        rf.last_applied = rf.commit_index;
-                        (index, commands)
-                    } else {
-                        continue;
-                    }
-                };
-
-                // Release the lock while calling external functions.
-                for command in commands {
-                    apply_command(index, command);
-                    snapshot_daemon.trigger();
-                    index += 1;
-                }
-            }
-
-            drop(stop_wait_group);
-        })
-    }
-
     pub fn start(&self, command: Command) -> Option<(Term, Index)> {
         let mut rf = self.inner_state.lock();
         let term = rf.current_term;
@@ -923,7 +874,7 @@ where
     }
 }
 
-const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
+pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
 const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
 const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
 const RPC_DEADLINE: Duration = Duration::from_secs(2);