Przeglądaj źródła

Add arguments for snapshot taking.

Jing Yang 5 lat temu
rodzic
commit
165b273fec
3 zmienionych plików z 28 dodań i 12 usunięć
  1. 9 8
      src/lib.rs
  2. 6 0
      src/rpcs.rs
  3. 13 4
      tests/config/mod.rs

+ 9 - 8
src/lib.rs

@@ -22,7 +22,8 @@ pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
 pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
-use crate::snapshot::{Snapshot, SnapshotDaemon};
+pub use crate::snapshot::Snapshot;
+use crate::snapshot::SnapshotDaemon;
 use crate::utils::retry_rpc;
 
 mod index_term;
@@ -131,14 +132,17 @@ where
     ///
     /// Each instance will create at least 3 + (number of peers) threads. The
     /// extensive usage of threads is to minimize latency.
-    pub fn new<Func>(
+    pub fn new<ApplyCommandFunc, RequestSnapshotFunc>(
         peers: Vec<RpcClient>,
         me: usize,
         persister: Arc<dyn Persister>,
-        apply_command: Func,
+        apply_command: ApplyCommandFunc,
+        max_state_size_bytes: Option<usize>,
+        request_snapshot: RequestSnapshotFunc,
     ) -> Self
     where
-        Func: 'static + Send + FnMut(Index, Command),
+        ApplyCommandFunc: 'static + Send + FnMut(Index, Command),
+        RequestSnapshotFunc: 'static + Send + FnMut(Index) -> Snapshot,
     {
         let peer_size = peers.len();
         let mut state = RaftState {
@@ -202,10 +206,7 @@ where
         ));
         // The last step is to start running election timer.
         this.run_election_timer();
-        this.run_snapshot_daemon(Some(1 << 20), |index| Snapshot {
-            last_included_index: index,
-            data: vec![],
-        });
+        this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
         this
     }
 }

+ 6 - 0
src/rpcs.rs

@@ -165,6 +165,7 @@ mod tests {
     use crate::{Peer, Term};
 
     use super::*;
+    use crate::snapshot::Snapshot;
 
     type DoNothingPersister = ();
     impl crate::Persister for DoNothingPersister {
@@ -196,6 +197,11 @@ mod tests {
                 0,
                 Arc::new(()),
                 |_, _: i32| {},
+                None,
+                |index| Snapshot {
+                    last_included_index: index,
+                    data: vec![],
+                },
             ));
             register_server(raft, name, network.as_ref())?;
 

+ 13 - 4
tests/config/mod.rs

@@ -8,7 +8,7 @@ use rand::{thread_rng, Rng};
 use tokio::time::Duration;
 
 use ruaft::rpcs::register_server;
-use ruaft::{Persister, Raft, RpcClient};
+use ruaft::{Persister, Raft, RpcClient, Snapshot};
 
 pub mod persister;
 
@@ -307,10 +307,19 @@ impl Config {
         let persister = self.log.lock().saved[index].clone();
 
         let log_clone = self.log.clone();
-        let raft =
-            Raft::new(clients, index, persister, move |cmd_index, cmd| {
+        let raft = Raft::new(
+            clients,
+            index,
+            persister,
+            move |cmd_index, cmd| {
                 Self::apply_command(log_clone.clone(), index, cmd_index, cmd)
-            });
+            },
+            None,
+            |index| Snapshot {
+                last_included_index: index,
+                data: vec![],
+            },
+        );
         self.state.lock().rafts[index].replace(raft.clone());
 
         let raft = Arc::new(raft);