Selaa lähdekoodia

Send install snapshot request through apply command.

Jing Yang 4 vuotta sitten
vanhempi
commit
e949c5c550
5 muutettua tiedostoa jossa 40 lisäystä ja 19 poistoa
  1. 26 11
      src/apply_command.rs
  2. 1 0
      src/lib.rs
  3. 0 1
      src/log_array.rs
  4. 2 2
      src/rpcs.rs
  5. 11 5
      tests/config/mod.rs

+ 26 - 11
src/apply_command.rs

@@ -1,14 +1,19 @@
 use std::sync::atomic::Ordering;
 use std::time::Duration;
 
-use crate::{Index, Raft, HEARTBEAT_INTERVAL_MILLIS};
+use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
+
+pub enum ApplyCommandMessage<Command> {
+    Snapshot(Snapshot),
+    Command(Index, Command),
+}
 
 pub trait ApplyCommandFnMut<Command>:
-    'static + Send + FnMut(Index, Command)
+    'static + Send + FnMut(ApplyCommandMessage<Command>)
 {
 }
 
-impl<Command, T: 'static + Send + FnMut(Index, Command)>
+impl<Command, T: 'static + Send + FnMut(ApplyCommandMessage<Command>)>
     ApplyCommandFnMut<Command> for T
 {
 }
@@ -28,7 +33,7 @@ where
         let stop_wait_group = self.stop_wait_group.clone();
         std::thread::spawn(move || {
             while keep_running.load(Ordering::SeqCst) {
-                let (mut index, commands) = {
+                let messages = {
                     let mut rf = rf.lock();
                     if rf.last_applied >= rf.commit_index {
                         condvar.wait_for(
@@ -36,27 +41,37 @@ where
                             Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
                         );
                     }
-                    if rf.last_applied < rf.commit_index {
+                    if rf.last_applied < rf.log.start() {
+                        let (index_term, data) = rf.log.snapshot();
+                        vec![ApplyCommandMessage::Snapshot(Snapshot {
+                            last_included_index: index_term.index,
+                            data: data.to_vec(),
+                        })]
+                    } else if rf.last_applied < rf.commit_index {
                         let index = rf.last_applied + 1;
                         let last_one = rf.commit_index + 1;
-                        let commands: Vec<Command> = rf
+                        let messages: Vec<ApplyCommandMessage<Command>> = rf
                             .log
                             .between(index, last_one)
                             .iter()
-                            .map(|entry| entry.command.clone())
+                            .map(|entry| {
+                                ApplyCommandMessage::Command(
+                                    entry.index,
+                                    entry.command.clone(),
+                                )
+                            })
                             .collect();
                         rf.last_applied = rf.commit_index;
-                        (index, commands)
+                        messages
                     } else {
                         continue;
                     }
                 };
 
                 // Release the lock while calling external functions.
-                for command in commands {
-                    apply_command(index, command);
+                for message in messages {
+                    apply_command(message);
                     snapshot_daemon.trigger();
-                    index += 1;
                 }
             }
 

+ 1 - 0
src/lib.rs

@@ -17,6 +17,7 @@ use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::apply_command::ApplyCommandFnMut;
+pub use crate::apply_command::ApplyCommandMessage;
 use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;

+ 0 - 1
src/log_array.rs

@@ -98,7 +98,6 @@ impl<C> LogArray<C> {
     }
 
     /// The snapshot before and including `start()`.
-    #[allow(dead_code)]
     pub fn snapshot(&self) -> (IndexTerm, &[u8]) {
         (self.first_index_term(), &self.snapshot)
     }

+ 2 - 2
src/rpcs.rs

@@ -131,7 +131,7 @@ mod tests {
 
     use bytes::Bytes;
 
-    use crate::{Peer, Term};
+    use crate::{ApplyCommandMessage, Peer, Term};
 
     use super::*;
 
@@ -164,7 +164,7 @@ mod tests {
                 vec![RpcClient(client)],
                 0,
                 Arc::new(()),
-                |_, _: i32| {},
+                |_: ApplyCommandMessage<i32>| {},
                 None,
                 Raft::<i32>::NO_SNAPSHOT,
             ));

+ 11 - 5
tests/config/mod.rs

@@ -9,7 +9,7 @@ use rand::{thread_rng, Rng};
 use tokio::time::Duration;
 
 use ruaft::rpcs::register_server;
-use ruaft::{Persister, Raft, RpcClient};
+use ruaft::{ApplyCommandMessage, Persister, Raft, RpcClient};
 
 pub mod persister;
 
@@ -312,8 +312,8 @@ impl Config {
             clients,
             index,
             persister,
-            move |cmd_index, cmd| {
-                Self::apply_command(log_clone.clone(), index, cmd_index, cmd)
+            move |message| {
+                Self::apply_command(log_clone.clone(), index, message)
             },
             None,
             Raft::<i32>::NO_SNAPSHOT,
@@ -380,9 +380,15 @@ impl Config {
     fn apply_command(
         log_state: Arc<Mutex<LogState>>,
         server_index: usize,
-        index: usize,
-        command: i32,
+        message: ApplyCommandMessage<i32>,
     ) {
+        let (index, command) =
+            if let ApplyCommandMessage::Command(index, command) = message {
+                (index, command)
+            } else {
+                // Ignore snapshots.
+                return;
+            };
         let mut log_state = log_state.lock();
         let committed_logs = &mut log_state.committed_logs;
         let mut err = None;