Browse Source

Refactor: log vector to LogArray.

So that logs can be compressed easily. To compress the log, we ask
the application to copy the current state at a certain commit index,
and discard all logs before that index.
Jing Yang 5 years ago
parent
commit
f96c82b539
4 changed files with 240 additions and 26 deletions
  1. 9 17
      src/lib.rs
  2. 226 0
      src/log_array.rs
  3. 1 1
      src/persister.rs
  4. 4 8
      src/raft_state.rs

+ 9 - 17
src/lib.rs

@@ -23,6 +23,7 @@ pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
 use crate::utils::retry_rpc;
 
+mod log_array;
 mod persister;
 mod raft_state;
 pub mod rpcs;
@@ -137,11 +138,7 @@ where
         let mut state = RaftState {
             current_term: Term(0),
             voted_for: None,
-            log: vec![LogEntry {
-                term: Term(0),
-                index: 0,
-                command: Command::default(),
-            }],
+            log: log_array::LogArray::create(),
             commit_index: 0,
             last_applied: 0,
             next_index: vec![1; peer_size],
@@ -156,7 +153,7 @@ where
         {
             state.current_term = persisted_state.current_term;
             state.voted_for = persisted_state.voted_for;
-            state.log = persisted_state.log;
+            state.log = log_array::LogArray::restore(persisted_state.log).unwrap();
         }
 
         let election = ElectionState {
@@ -231,7 +228,7 @@ where
         }
 
         let voted_for = rf.voted_for;
-        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let (last_log_index, last_log_term) = rf.log.last_index_term();
         if (voted_for.is_none() || voted_for == Some(args.candidate_id))
             && (args.last_log_term > last_log_term
                 || (args.last_log_term == last_log_term
@@ -426,7 +423,7 @@ where
             self.persister.save_state(rf.persisted_state().into());
 
             let term = rf.current_term;
-            let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+            let (last_log_index, last_log_term) = rf.log.last_index_term();
 
             (
                 term,
@@ -585,7 +582,7 @@ where
             return None;
         }
 
-        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let (last_log_index, last_log_term) = rf.log.last_index_term();
         let args = AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
@@ -762,7 +759,7 @@ where
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_term,
-            entries: rf.log[rf.next_index[peer_index]..].to_vec(),
+            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
             leader_commit: rf.commit_index,
         })
     }
@@ -810,7 +807,7 @@ where
                     if rf.last_applied < rf.commit_index {
                         let index = rf.last_applied + 1;
                         let last_one = rf.commit_index + 1;
-                        let commands: Vec<Command> = rf.log[index..last_one]
+                        let commands: Vec<Command> = rf.log.between(index, last_one)
                             .iter()
                             .map(|entry| entry.command.clone())
                             .collect();
@@ -839,12 +836,7 @@ where
             return None;
         }
 
-        let index = rf.log.len();
-        rf.log.push(LogEntry {
-            term,
-            index,
-            command,
-        });
+        let index = rf.log.add(term, command);
         self.persister.save_state(rf.persisted_state().into());
 
         let _ = self.new_log_entry.clone().unwrap().send(None);

+ 226 - 0
src/log_array.rs

@@ -0,0 +1,226 @@
+use crate::{Command, Index, LogEntry, Term};
+use std::mem::swap;
+
+pub(crate) struct LogArray {
+    inner: Vec<LogEntry>,
+    snapshot: bytes::Bytes,
+}
+
+impl LogArray {
+    pub fn create() -> LogArray {
+        let ret = LogArray {
+            inner: vec![Self::build_first_entry(0, Term(0))],
+            snapshot: bytes::Bytes::new(),
+        };
+        ret.check_one_element();
+        ret
+    }
+
+    pub fn restore(inner: Vec<LogEntry>) -> std::io::Result<Self> {
+        Ok(LogArray {
+            inner,
+            snapshot: bytes::Bytes::new(),
+        })
+    }
+}
+
+// Log accessors
+impl LogArray {
+    pub fn start_offset(&self) -> Index {
+        self.first_entry().index
+    }
+
+    pub fn len(&self) -> usize {
+        self.start_offset() + self.inner.len()
+    }
+
+    #[allow(dead_code)]
+    pub fn first_index_term(&self) -> (Index, Term) {
+        let first_entry = self.first_entry();
+        (first_entry.index, first_entry.term)
+    }
+
+    pub fn last_index_term(&self) -> (Index, Term) {
+        let last_entry = self.last_entry();
+        (last_entry.index, last_entry.term)
+    }
+
+    pub fn at(&self, index: Index) -> &LogEntry {
+        let index = self.check_start_index(index);
+        &self.inner[index]
+    }
+
+    pub fn after(&self, index: Index) -> &[LogEntry] {
+        let index = self.check_start_index(index);
+        &self.inner[index..]
+    }
+
+    pub fn between(&self, start: Index, end: Index) -> &[LogEntry] {
+        let start = self.check_start_index(start);
+        let end = self.check_end_index(end);
+        &self.inner[start..end]
+    }
+
+    pub fn all(&self) -> &[LogEntry] {
+        &self.inner[..]
+    }
+
+    #[allow(dead_code)]
+    pub fn snapshot(&self) -> &bytes::Bytes {
+        &self.snapshot
+    }
+}
+
+impl std::ops::Index<usize> for LogArray {
+    type Output = LogEntry;
+
+    fn index(&self, index: usize) -> &Self::Output {
+        self.at(index)
+    }
+}
+
+// Mutations
+impl LogArray {
+    pub fn add(&mut self, term: Term, command: Command) -> Index {
+        let index = self.len();
+        self.push(LogEntry {
+            index,
+            term,
+            command,
+        });
+        index
+    }
+
+    pub fn push(&mut self, log_entry: LogEntry) {
+        let index = log_entry.index;
+        assert_eq!(
+            index,
+            self.len(),
+            "Expecting new index to be exact at len",
+        );
+        self.inner.push(log_entry);
+        assert_eq!(
+            index + 1,
+            self.len(),
+            "Expecting len increase by one after push",
+        );
+        assert_eq!(
+            self.at(index).index,
+            index,
+            "Expecting pushed element to have the same index",
+        );
+        self.check_one_element();
+    }
+
+    pub fn truncate(&mut self, index: Index) {
+        let index = self.check_middle_index(index);
+        self.inner.truncate(index);
+        self.check_one_element()
+    }
+
+    #[allow(dead_code)]
+    pub fn shift(&mut self, index: Index, snapshot: bytes::Bytes) {
+        // Discard everything before index and store the snapshot.
+        let offset = self.check_middle_index(index);
+        self.inner = self.inner.split_off(offset);
+        self.snapshot = snapshot;
+
+        // Override the first entry, we know there is at least one entry. This is not strictly
+        // needed. One benefit is that the command can be released after this point.
+        let (first_index, first_term) = self.first_index_term();
+        self.inner[0] = Self::build_first_entry(first_index, first_term);
+
+        assert_eq!(
+            first_index, index,
+            "Expecting the first entry to have the same index."
+        );
+
+        self.check_one_element()
+    }
+
+    #[allow(dead_code)]
+    pub fn reset(
+        &mut self,
+        index: Index,
+        term: Term,
+        snapshot: bytes::Bytes,
+    ) -> Vec<LogEntry> {
+        let mut inner = vec![Self::build_first_entry(index, term)];
+        swap(&mut inner, &mut self.inner);
+        self.snapshot = snapshot;
+
+        self.check_one_element();
+
+        inner
+    }
+}
+
+impl LogArray {
+    fn first_entry(&self) -> &LogEntry {
+        self.inner
+            .first()
+            .expect("There must be at least one element in log")
+    }
+
+    fn last_entry(&self) -> &LogEntry {
+        &self
+            .inner
+            .last()
+            .expect("There must be at least one entry in log")
+    }
+
+    fn offset(&self, index: Index) -> usize {
+        index - self.start_offset()
+    }
+
+    fn check_start_index(&self, index: Index) -> usize {
+        assert!(
+            index >= self.start_offset() && index < self.len(),
+            "Accessing start log index {} out of range [{}, {})",
+            index,
+            self.start_offset(),
+            self.len()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_end_index(&self, index: Index) -> usize {
+        assert!(
+            index > self.start_offset() && index <= self.len(),
+            "Accessing end log index {} out of range ({}, {}]",
+            index,
+            self.start_offset(),
+            self.len()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_middle_index(&self, index: Index) -> usize {
+        assert!(
+            index > self.start_offset() && index < self.len(),
+            "Log index {} out of range ({}, {})",
+            index,
+            self.start_offset(),
+            self.len()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_one_element(&self) {
+        assert!(
+            self.inner.len() >= 1,
+            "There must be at least one element in log"
+        )
+    }
+
+    fn build_first_entry(index: Index, term: Term) -> LogEntry {
+        LogEntry {
+            index,
+            term,
+            command: Command(0),
+        }
+    }
+}

+ 1 - 1
src/persister.rs

@@ -31,7 +31,7 @@ impl<Command: Clone> From<&RaftState<Command>> for PersistedRaftState<Command> {
         Self {
             current_term: raft_state.current_term,
             voted_for: raft_state.voted_for,
-            log: raft_state.log.clone(),
+            log: raft_state.log.all().to_vec(),
         }
     }
 }

+ 4 - 8
src/raft_state.rs

@@ -1,4 +1,6 @@
-use crate::{persister::PersistedRaftState, Index, LogEntry, Peer, Term};
+use crate::{
+    log_array::LogArray, persister::PersistedRaftState, Index, Peer, Term,
+};
 
 #[derive(Debug, Eq, PartialEq)]
 pub(crate) enum State {
@@ -11,7 +13,7 @@ pub(crate) enum State {
 pub(crate) struct RaftState<Command> {
     pub current_term: Term,
     pub voted_for: Option<Peer>,
-    pub log: Vec<LogEntry<Command>>,
+    pub log: LogArray,
 
     pub commit_index: Index,
     pub last_applied: Index,
@@ -32,12 +34,6 @@ impl<Command: Clone> RaftState<Command> {
 }
 
 impl<Command> RaftState<Command> {
-    pub fn last_log_index_and_term(&self) -> (Index, Term) {
-        let len = self.log.len();
-        assert!(len > 0, "There should always be at least one entry in log");
-        (len - 1, self.log.last().unwrap().term)
-    }
-
     pub fn is_leader(&self) -> bool {
         self.state == State::Leader
     }