Sfoglia il codice sorgente

Implement persister.

Jing Yang 5 anni fa
parent
commit
fa74454870

+ 1 - 0
Cargo.toml

@@ -8,6 +8,7 @@ edition = "2018"
 
 [dependencies]
 bincode = "1.3.1"
+bytes = "0.5.6"
 crossbeam-utils = "0.7.2"
 futures = { version = "0.3.5" }
 labrpc = { path = "../labrpc" }

+ 18 - 3
src/lib.rs

@@ -6,17 +6,21 @@ extern crate rand;
 extern crate serde_derive;
 extern crate tokio;
 
+use std::convert::TryFrom;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 
+use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
+use crate::persister::PersistedRaftState;
+pub use crate::persister::Persister;
 pub use crate::rpcs::RpcClient;
 use crate::utils::{retry_rpc, DropGuard};
-use crossbeam_utils::sync::WaitGroup;
 
+mod persister;
 pub mod rpcs;
 pub mod utils;
 
@@ -79,6 +83,8 @@ pub struct Raft {
 
     me: Peer,
 
+    persister: Arc<dyn Persister>,
+
     new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
@@ -123,13 +129,14 @@ impl Raft {
     pub fn new<Func>(
         peers: Vec<RpcClient>,
         me: usize,
+        persister: Arc<dyn Persister>,
         apply_command: Func,
     ) -> Self
     where
         Func: 'static + Send + FnMut(Index, Command),
     {
         let peer_size = peers.len();
-        let state = RaftState {
+        let mut state = RaftState {
             current_term: Term(0),
             voted_for: None,
             log: vec![LogEntry {
@@ -146,6 +153,14 @@ impl Raft {
             leader_id: Peer(me),
         };
 
+        if let Ok(persisted_state) =
+            PersistedRaftState::try_from(persister.read_state())
+        {
+            state.current_term = persisted_state.current_term;
+            state.voted_for = persisted_state.voted_for;
+            state.log = persisted_state.log;
+        }
+
         let election = ElectionState {
             timer: Mutex::new((0, None)),
             signal: Condvar::new(),
@@ -164,6 +179,7 @@ impl Raft {
             inner_state: Arc::new(Mutex::new(state)),
             peers,
             me: Peer(me),
+            persister,
             new_log_entry: None,
             apply_command_signal: Arc::new(Default::default()),
             keep_running: Arc::new(Default::default()),
@@ -172,7 +188,6 @@ impl Raft {
             stop_wait_group: WaitGroup::new(),
         };
 
-        // TODO: read persist.
         this.keep_running.store(true, Ordering::SeqCst);
         // Running in a standalone thread.
         this.run_log_entry_daemon();

+ 44 - 0
src/persister.rs

@@ -0,0 +1,44 @@
+use std::convert::TryFrom;
+
+use bytes::Bytes;
+
+use crate::{LogEntry, Peer, RaftState, Term};
+
+pub trait Persister: Send + Sync {
+    fn read_state(&self) -> Bytes;
+    fn save_state(&self, bytes: Bytes);
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) struct PersistedRaftState {
+    pub current_term: Term,
+    pub voted_for: Option<Peer>,
+    pub log: Vec<LogEntry>,
+}
+
+impl<T: AsRef<RaftState>> From<T> for PersistedRaftState {
+    fn from(raft_state: T) -> Self {
+        let raft_state = raft_state.as_ref();
+        Self {
+            current_term: raft_state.current_term,
+            voted_for: raft_state.voted_for,
+            log: raft_state.log.clone(),
+        }
+    }
+}
+
+impl TryFrom<Bytes> for PersistedRaftState {
+    type Error = bincode::Error;
+
+    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
+        bincode::deserialize(value.as_ref())
+    }
+}
+
+impl Into<Bytes> for PersistedRaftState {
+    fn into(self) -> Bytes {
+        bincode::serialize(&self)
+            .expect("Serialization should not fail")
+            .into()
+    }
+}

+ 14 - 1
src/rpcs.rs

@@ -112,9 +112,21 @@ pub fn register_server<S: AsRef<str>>(
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use bytes::Bytes;
+
     use crate::{Peer, Term};
 
+    use super::*;
+
+    type DoNothingPersister = ();
+    impl crate::Persister for DoNothingPersister {
+        fn read_state(&self) -> Bytes {
+            Bytes::new()
+        }
+
+        fn save_state(&self, _bytes: Bytes) {}
+    }
+
     #[test]
     fn test_basic_message() -> std::io::Result<()> {
         let client = {
@@ -129,6 +141,7 @@ mod tests {
             let raft = Arc::new(Raft::new(
                 vec![RpcClient(client.clone())],
                 0,
+                Arc::new(()),
                 |_, _| {},
             ));
             register_server(raft, name, network.as_ref())?;

+ 1 - 0
tests/agreement_tests.rs

@@ -1,5 +1,6 @@
 #[macro_use]
 extern crate anyhow;
+extern crate bytes;
 extern crate labrpc;
 extern crate ruaft;
 

+ 7 - 3
tests/config/mod.rs

@@ -11,6 +11,8 @@ use ruaft::rpcs::register_server;
 use ruaft::utils::DropGuard;
 use ruaft::{Raft, RpcClient};
 
+pub mod persister;
+
 struct ConfigState {
     rafts: Vec<Option<Raft>>,
     connected: Vec<bool>,
@@ -291,11 +293,13 @@ impl Config {
                 )))
             }
         }
+        let persister = Arc::new(persister::Persister::new());
 
         let log_clone = self.log.clone();
-        let raft = Raft::new(clients, index, move |cmd_index, cmd| {
-            Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
-        });
+        let raft =
+            Raft::new(clients, index, persister, move |cmd_index, cmd| {
+                Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
+            });
         self.state.lock().rafts[index].replace(raft.clone());
 
         let raft = Arc::new(raft);

+ 29 - 0
tests/config/persister/mod.rs

@@ -0,0 +1,29 @@
+use parking_lot::Mutex;
+
+struct State {
+    bytes: bytes::Bytes,
+}
+
+pub struct Persister {
+    state: Mutex<State>,
+}
+
+impl Persister {
+    pub fn new() -> Self {
+        Self {
+            state: Mutex::new(State {
+                bytes: bytes::Bytes::new(),
+            }),
+        }
+    }
+}
+
+impl ruaft::Persister for Persister {
+    fn read_state(&self) -> bytes::Bytes {
+        self.state.lock().bytes.clone()
+    }
+
+    fn save_state(&self, data: bytes::Bytes) {
+        self.state.lock().bytes = data;
+    }
+}

+ 1 - 0
tests/election_tests.rs

@@ -1,5 +1,6 @@
 #[macro_use]
 extern crate anyhow;
+extern crate bytes;
 extern crate labrpc;
 extern crate ruaft;
 

+ 0 - 0
tests/persist_tests.rs