Bläddra i källkod

Add persister to lib, and tests.

Jing Yang 5 år sedan
förälder
incheckning
93d36abc85
4 ändrade filer med 91 tillägg och 22 borttagningar
  1. 10 15
      src/lib.rs
  2. 6 1
      src/persister.rs
  3. 17 6
      tests/config/mod.rs
  4. 58 0
      tests/persist_tests.rs

+ 10 - 15
src/lib.rs

@@ -18,7 +18,7 @@ use rand::{thread_rng, Rng};
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
 pub use crate::rpcs::RpcClient;
-use crate::utils::{retry_rpc, DropGuard};
+use crate::utils::retry_rpc;
 
 mod persister;
 pub mod rpcs;
@@ -222,7 +222,7 @@ impl Raft {
             rf.state = State::Follower;
 
             self.election.reset_election_timer();
-            rf.persist();
+            self.persister.save_state(rf.persisted_state().into());
         }
 
         let voted_for = rf.voted_for;
@@ -238,7 +238,7 @@ impl Raft {
             // current term. It does not hurt to update the timer again.
             // We do need to persist, though.
             self.election.reset_election_timer();
-            rf.persist();
+            self.persister.save_state(rf.persisted_state().into());
 
             RequestVoteReply {
                 term: args.term,
@@ -264,11 +264,10 @@ impl Raft {
             };
         }
 
-        let _ = rf.deferred_persist();
-
         if rf.current_term < args.term {
             rf.current_term = args.term;
             rf.voted_for = None;
+            self.persister.save_state(rf.persisted_state().into());
         }
 
         rf.state = State::Follower;
@@ -297,6 +296,8 @@ impl Raft {
             }
         }
 
+        self.persister.save_state(rf.persisted_state().into());
+
         if args.leader_commit > rf.commit_index {
             rf.commit_index = if args.leader_commit < rf.log.len() {
                 args.leader_commit
@@ -401,7 +402,7 @@ impl Raft {
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
 
-            rf.persist();
+            self.persister.save_state(rf.persisted_state().into());
 
             let term = rf.current_term;
             let (last_log_index, last_log_term) = rf.last_log_index_and_term();
@@ -522,7 +523,6 @@ impl Raft {
             new_log_entry
                 .send(None)
                 .expect("Triggering log entry syncing should not fail");
-            rf.persist();
         }
     }
 
@@ -779,7 +779,7 @@ impl Raft {
             index,
             command,
         });
-        rf.persist();
+        self.persister.save_state(rf.persisted_state().into());
 
         self.new_log_entry
             .clone()
@@ -803,7 +803,6 @@ impl Raft {
             .shutdown_timeout(Duration::from_millis(
                 HEARTBEAT_INTERVAL_MILLIS * 2,
             ));
-        self.inner_state.lock().persist();
     }
 
     pub fn get_state(&self) -> (Term, bool) {
@@ -813,12 +812,8 @@ impl Raft {
 }
 
 impl RaftState {
-    fn persist(&self) {
-        // TODO: implement
-    }
-
-    fn deferred_persist(&self) -> impl Drop + '_ {
-        DropGuard::new(move || self.persist())
+    fn persisted_state(&self) -> PersistedRaftState {
+        self.into()
     }
 
     fn last_log_index_and_term(&self) -> (Index, Term) {

+ 6 - 1
src/persister.rs

@@ -18,7 +18,12 @@ pub(crate) struct PersistedRaftState {
 
 impl<T: AsRef<RaftState>> From<T> for PersistedRaftState {
     fn from(raft_state: T) -> Self {
-        let raft_state = raft_state.as_ref();
+        Self::from(raft_state.as_ref())
+    }
+}
+
+impl From<&RaftState> for PersistedRaftState {
+    fn from(raft_state: &RaftState) -> Self {
         Self {
             current_term: raft_state.current_term,
             voted_for: raft_state.voted_for,

+ 17 - 6
tests/config/mod.rs

@@ -9,7 +9,7 @@ use tokio::time::Duration;
 
 use ruaft::rpcs::register_server;
 use ruaft::utils::DropGuard;
-use ruaft::{Raft, RpcClient};
+use ruaft::{Persister, Raft, RpcClient};
 
 pub mod persister;
 
@@ -29,6 +29,7 @@ pub struct Config {
     server_count: usize,
     state: Mutex<ConfigState>,
     log: Arc<Mutex<LogState>>,
+    saved: std::cell::RefCell<Vec<Arc<persister::Persister>>>,
 }
 
 impl Config {
@@ -265,7 +266,7 @@ impl Config {
         }
     }
 
-    pub fn crash1(&mut self, index: usize) {
+    pub fn crash1(&self, index: usize) {
         self.disconnect(index);
 
         unlock(self.network.as_ref()).remove_server(Self::server_name(index));
@@ -273,12 +274,16 @@ impl Config {
             let mut state = self.state.lock();
             state.rafts[index].take()
         };
+        let mut saved = self.saved.borrow_mut();
+        let data = saved[index].read_state();
         if let Some(raft) = raft {
             raft.kill();
         }
+        saved[index] = Arc::new(persister::Persister::new());
+        saved[index].save_state(data);
     }
 
-    pub fn start1(&mut self, index: usize) -> Result<()> {
+    pub fn start1(&self, index: usize) -> Result<()> {
         if self.state.lock().rafts[index].is_some() {
             self.crash1(index);
         }
@@ -293,7 +298,7 @@ impl Config {
                 )))
             }
         }
-        let persister = Arc::new(persister::Persister::new());
+        let persister = self.saved.borrow()[index].clone();
 
         let log_clone = self.log.clone();
         let raft =
@@ -362,7 +367,7 @@ impl Config {
                 err = Some((
                     one_index,
                     Err(anyhow!(
-                        "commit index ={} server={} {} != server={} {}",
+                        "commit index={} server={} {} != server={} {}",
                         index,
                         server_index,
                         command,
@@ -418,11 +423,17 @@ pub fn make_config(server_count: usize, unreliable: bool) -> Config {
         max_index: 0,
     }));
     log.lock().results.resize_with(server_count, || Ok(()));
-    let mut cfg = Config {
+
+    let saved = std::cell::RefCell::new(vec![]);
+    saved
+        .borrow_mut()
+        .resize_with(server_count, || Arc::new(persister::Persister::new()));
+    let cfg = Config {
         network,
         server_count,
         state,
         log,
+        saved,
     };
 
     for i in 0..server_count {

+ 58 - 0
tests/persist_tests.rs

@@ -0,0 +1,58 @@
+#[macro_use]
+extern crate anyhow;
+extern crate bytes;
+extern crate labrpc;
+extern crate ruaft;
+
+mod config;
+
+#[test]
+fn persist() -> config::Result<()> {
+    const SERVERS: usize = 5;
+    let cfg = config::make_config(SERVERS, false);
+    let _guard = cfg.deferred_cleanup();
+
+    cfg.begin("Test (2C): basic persistence");
+
+    cfg.one(11, SERVERS, true)?;
+
+    // crash and re-start all
+    for i in 0..SERVERS {
+        cfg.start1(i)?;
+    }
+    for i in 0..SERVERS {
+        cfg.disconnect(i);
+        cfg.connect(i);
+    }
+
+    cfg.one(12, SERVERS, true);
+
+    let leader1 = cfg.check_one_leader()?;
+    cfg.disconnect(leader1);
+    cfg.start1(leader1)?;
+    cfg.connect(leader1);
+
+    cfg.one(13, SERVERS, true)?;
+
+    let leader2 = cfg.check_one_leader()?;
+    cfg.disconnect(leader2);
+    cfg.one(14, SERVERS - 1, true);
+    cfg.start1(leader2)?;
+    cfg.connect(leader2);
+
+    // wait for leader2 to join before killing i3
+    cfg.wait(4, SERVERS, None)?;
+
+    let i3 = (cfg.check_one_leader()? + 1) % SERVERS;
+    cfg.disconnect(i3);
+    cfg.one(15, SERVERS - 1, true)?;
+    cfg.start1(i3)?;
+    cfg.connect(i3);
+
+    cfg.one(16, SERVERS, true)?;
+
+    cfg.end();
+
+    drop(_guard);
+    Ok(())
+}