Просмотр исходного кода

Merge branch 'log_array': upgrade log array to support snapshot taking.

Jing Yang 5 лет назад
Родитель
Сommit
cd31fae304
6 измененных файлов с 792 добавлено и 41 удалено
  1. 37 0
      src/index_term.rs
  2. 28 29
      src/lib.rs
  3. 717 0
      src/log_array.rs
  4. 1 1
      src/persister.rs
  5. 4 8
      src/raft_state.rs
  6. 5 3
      src/rpcs.rs

+ 37 - 0
src/index_term.rs

@@ -0,0 +1,37 @@
+use crate::{Index, LogEntry, Term};
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub(crate) struct IndexTerm {
+    pub index: Index,
+    pub term: Term,
+}
+
+impl<C> From<&LogEntry<C>> for IndexTerm {
+    fn from(entry: &LogEntry<C>) -> Self {
+        Self {
+            index: entry.index,
+            term: entry.term,
+        }
+    }
+}
+
+impl From<IndexTerm> for (Index, Term) {
+    fn from(index_term: IndexTerm) -> Self {
+        index_term.unpack()
+    }
+}
+
+impl From<(Index, Term)> for IndexTerm {
+    fn from(index_term: (Index, Term)) -> Self {
+        IndexTerm {
+            index: index_term.0,
+            term: index_term.1,
+        }
+    }
+}
+
+impl IndexTerm {
+    pub fn unpack(&self) -> (Index, Term) {
+        (self.index, self.term)
+    }
+}

+ 28 - 29
src/lib.rs

@@ -23,6 +23,8 @@ pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
 pub use crate::rpcs::RpcClient;
 use crate::utils::retry_rpc;
 use crate::utils::retry_rpc;
 
 
+mod index_term;
+mod log_array;
 mod persister;
 mod persister;
 mod raft_state;
 mod raft_state;
 pub mod rpcs;
 pub mod rpcs;
@@ -136,11 +138,7 @@ where
         let mut state = RaftState {
         let mut state = RaftState {
             current_term: Term(0),
             current_term: Term(0),
             voted_for: None,
             voted_for: None,
-            log: vec![LogEntry {
-                term: Term(0),
-                index: 0,
-                command: Command::default(),
-            }],
+            log: log_array::LogArray::create(),
             commit_index: 0,
             commit_index: 0,
             last_applied: 0,
             last_applied: 0,
             next_index: vec![1; peer_size],
             next_index: vec![1; peer_size],
@@ -155,7 +153,8 @@ where
         {
         {
             state.current_term = persisted_state.current_term;
             state.current_term = persisted_state.current_term;
             state.voted_for = persisted_state.voted_for;
             state.voted_for = persisted_state.voted_for;
-            state.log = persisted_state.log;
+            state.log =
+                log_array::LogArray::restore(persisted_state.log).unwrap();
         }
         }
 
 
         let election = ElectionState {
         let election = ElectionState {
@@ -203,9 +202,10 @@ where
 // Command must be
 // Command must be
 // 1. clone: they are copied to the persister.
 // 1. clone: they are copied to the persister.
 // 2. serialize: they are converted to bytes to persist.
 // 2. serialize: they are converted to bytes to persist.
+// 3. default: a default value is used as the first element of the log.
 impl<Command> Raft<Command>
 impl<Command> Raft<Command>
 where
 where
-    Command: Clone + serde::Serialize,
+    Command: Clone + serde::Serialize + Default,
 {
 {
     pub(crate) fn process_request_vote(
     pub(crate) fn process_request_vote(
         &self,
         &self,
@@ -230,11 +230,11 @@ where
         }
         }
 
 
         let voted_for = rf.voted_for;
         let voted_for = rf.voted_for;
-        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let last_log = rf.log.last_index_term();
         if (voted_for.is_none() || voted_for == Some(args.candidate_id))
         if (voted_for.is_none() || voted_for == Some(args.candidate_id))
-            && (args.last_log_term > last_log_term
-                || (args.last_log_term == last_log_term
-                    && args.last_log_index >= last_log_index))
+            && (args.last_log_term > last_log.term
+                || (args.last_log_term == last_log.term
+                    && args.last_log_index >= last_log.index))
         {
         {
             rf.voted_for = Some(args.candidate_id);
             rf.voted_for = Some(args.candidate_id);
 
 
@@ -279,7 +279,7 @@ where
 
 
         self.election.reset_election_timer();
         self.election.reset_election_timer();
 
 
-        if rf.log.len() <= args.prev_log_index
+        if rf.log.end() <= args.prev_log_index
             || rf.log[args.prev_log_index].term != args.prev_log_term
             || rf.log[args.prev_log_index].term != args.prev_log_term
         {
         {
             return AppendEntriesReply {
             return AppendEntriesReply {
@@ -290,7 +290,7 @@ where
 
 
         for (i, entry) in args.entries.iter().enumerate() {
         for (i, entry) in args.entries.iter().enumerate() {
             let index = i + args.prev_log_index + 1;
             let index = i + args.prev_log_index + 1;
-            if rf.log.len() > index {
+            if rf.log.end() > index {
                 if rf.log[index].term != entry.term {
                 if rf.log[index].term != entry.term {
                     rf.log.truncate(index);
                     rf.log.truncate(index);
                     rf.log.push(entry.clone());
                     rf.log.push(entry.clone());
@@ -303,10 +303,10 @@ where
         self.persister.save_state(rf.persisted_state().into());
         self.persister.save_state(rf.persisted_state().into());
 
 
         if args.leader_commit > rf.commit_index {
         if args.leader_commit > rf.commit_index {
-            rf.commit_index = if args.leader_commit < rf.log.len() {
+            rf.commit_index = if args.leader_commit < rf.log.end() {
                 args.leader_commit
                 args.leader_commit
             } else {
             } else {
-                rf.log.len() - 1
+                rf.log.last_index_term().index
             };
             };
             self.apply_command_signal.notify_one();
             self.apply_command_signal.notify_one();
         }
         }
@@ -323,9 +323,10 @@ where
 // 1. clone: they are copied to the persister.
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 3. serialize: they are converted to bytes to persist.
 // 3. serialize: they are converted to bytes to persist.
+// 4. default: a default value is used as the first element of log.
 impl<Command> Raft<Command>
 impl<Command> Raft<Command>
 where
 where
-    Command: 'static + Clone + Send + serde::Serialize,
+    Command: 'static + Clone + Send + serde::Serialize + Default,
 {
 {
     fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
     fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
         let this = self.clone();
         let this = self.clone();
@@ -425,7 +426,8 @@ where
             self.persister.save_state(rf.persisted_state().into());
             self.persister.save_state(rf.persisted_state().into());
 
 
             let term = rf.current_term;
             let term = rf.current_term;
-            let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+            let (last_log_index, last_log_term) =
+                rf.log.last_index_term().unpack();
 
 
             (
             (
                 term,
                 term,
@@ -535,7 +537,7 @@ where
 
 
             rf.state = State::Leader;
             rf.state = State::Leader;
             rf.leader_id = me;
             rf.leader_id = me;
-            let log_len = rf.log.len();
+            let log_len = rf.log.end();
             for item in rf.next_index.iter_mut() {
             for item in rf.next_index.iter_mut() {
                 *item = log_len;
                 *item = log_len;
             }
             }
@@ -584,12 +586,12 @@ where
             return None;
             return None;
         }
         }
 
 
-        let (last_log_index, last_log_term) = rf.last_log_index_and_term();
+        let last_log = rf.log.last_index_term();
         let args = AppendEntriesArgs {
         let args = AppendEntriesArgs {
             term: rf.current_term,
             term: rf.current_term,
             leader_id: rf.leader_id,
             leader_id: rf.leader_id,
-            prev_log_index: last_log_index,
-            prev_log_term: last_log_term,
+            prev_log_index: last_log.index,
+            prev_log_term: last_log.term,
             entries: vec![],
             entries: vec![],
             leader_commit: rf.commit_index,
             leader_commit: rf.commit_index,
         };
         };
@@ -761,7 +763,7 @@ where
             leader_id: rf.leader_id,
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_index,
             prev_log_term,
             prev_log_term,
-            entries: rf.log[rf.next_index[peer_index]..].to_vec(),
+            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
             leader_commit: rf.commit_index,
             leader_commit: rf.commit_index,
         })
         })
     }
     }
@@ -809,7 +811,9 @@ where
                     if rf.last_applied < rf.commit_index {
                     if rf.last_applied < rf.commit_index {
                         let index = rf.last_applied + 1;
                         let index = rf.last_applied + 1;
                         let last_one = rf.commit_index + 1;
                         let last_one = rf.commit_index + 1;
-                        let commands: Vec<Command> = rf.log[index..last_one]
+                        let commands: Vec<Command> = rf
+                            .log
+                            .between(index, last_one)
                             .iter()
                             .iter()
                             .map(|entry| entry.command.clone())
                             .map(|entry| entry.command.clone())
                             .collect();
                             .collect();
@@ -838,12 +842,7 @@ where
             return None;
             return None;
         }
         }
 
 
-        let index = rf.log.len();
-        rf.log.push(LogEntry {
-            term,
-            index,
-            command,
-        });
+        let index = rf.log.add_command(term, command);
         self.persister.save_state(rf.persisted_state().into());
         self.persister.save_state(rf.persisted_state().into());
 
 
         let _ = self.new_log_entry.clone().unwrap().send(None);
         let _ = self.new_log_entry.clone().unwrap().send(None);

+ 717 - 0
src/log_array.rs

@@ -0,0 +1,717 @@
+use crate::index_term::IndexTerm;
+use crate::{Index, LogEntry, Term};
+
+/// A log array that stores a tail of the whole Raft log.
+///
+/// The Raft log represented by the log array has length `end()`. Only log
+/// entries after `start()` are physically stored in the log array (with some
+/// caveats). The index and term of log entries in range `[start(), end())` are
+/// accessible. A snapshot is stored at the beginning of the log array, which
+/// covers all commands before and **including** `start()`. The command at
+/// `start()` is **not** accessible, but all commands after that are.
+///
+/// New entries can be appended to the end of the Raft log via `add_command()`
+/// or `push()`.
+///
+/// The log array can be truncated to at most one entry, which is at `start()`
+/// and contains the snapshot. The start of the log array can be shifted towards
+/// `end()`, if another snapshot at that position is provided.
+///
+/// The log array can also be reset to a single entry, contains an index, a term
+/// and a snapshot, via `reset()`.
+///
+/// The log array is guaranteed to have at least one element, containing an
+/// index, a term and a snapshot.
+///
+/// All APIs **will** panic if the given index(es) are out of bound.
+///
+/// NOT THREAD SAFE.
+pub(crate) struct LogArray<C> {
+    inner: Vec<LogEntry<C>>,
+    snapshot: Vec<u8>,
+}
+
+impl<C: Default> LogArray<C> {
+    /// Create the initial Raft log with no user-supplied commands.
+    pub fn create() -> LogArray<C> {
+        let ret = LogArray {
+            inner: vec![Self::build_first_entry(0, Term(0))],
+            snapshot: vec![],
+        };
+        ret.check_one_element();
+        ret
+    }
+
+    pub fn restore(inner: Vec<LogEntry<C>>) -> std::io::Result<Self> {
+        Ok(LogArray {
+            inner,
+            snapshot: vec![],
+        })
+    }
+}
+
+// Log accessors
+impl<C> LogArray<C> {
+    /// The start of the stored log entries. The command at this index is no
+    /// longer accessible, since it is included in the snapshot.
+    pub fn start(&self) -> Index {
+        self.first_entry().index
+    }
+
+    /// The end index of the Raft log.
+    pub fn end(&self) -> usize {
+        self.start() + self.inner.len()
+    }
+
+    /// The first index and term stored in this log array.
+    pub fn first_index_term(&self) -> IndexTerm {
+        self.first_entry().into()
+    }
+
+    /// The last index and term of the Raft log.
+    pub fn last_index_term(&self) -> IndexTerm {
+        self.last_entry().into()
+    }
+
+    /// The log entry at the given index.
+    pub fn at(&self, index: Index) -> &LogEntry<C> {
+        let index = self.check_start_index(index);
+        &self.inner[index]
+    }
+
+    /// All log entries after the given index.
+    pub fn after(&self, index: Index) -> &[LogEntry<C>] {
+        let index = self.check_start_index(index);
+        &self.inner[index..]
+    }
+
+    /// All log entries in range [start, end).
+    pub fn between(&self, start: Index, end: Index) -> &[LogEntry<C>] {
+        let start = self.check_start_index(start);
+        let end = self.check_end_index(end);
+        &self.inner[start..end]
+    }
+
+    /// All log entries stored in the array.
+    pub fn all(&self) -> &[LogEntry<C>] {
+        &self.inner[..]
+    }
+
+    /// The snapshot before and including `start()`.
+    #[allow(dead_code)]
+    pub fn snapshot(&self) -> (IndexTerm, &[u8]) {
+        (self.first_index_term(), &self.snapshot)
+    }
+}
+
+impl<C> std::ops::Index<usize> for LogArray<C> {
+    type Output = LogEntry<C>;
+
+    fn index(&self, index: usize) -> &Self::Output {
+        self.at(index)
+    }
+}
+
+// Mutations
+impl<C> LogArray<C> {
+    /// Add a new entry to the Raft log, with term and command. The new index is
+    /// returned.
+    pub fn add_command(&mut self, term: Term, command: C) -> Index {
+        let index = self.end();
+        self.push(LogEntry {
+            index,
+            term,
+            command,
+        });
+        index
+    }
+
+    /// Push a LogEntry into the Raft log. The index of the log entry must match
+    /// the next index in the log.
+    pub fn push(&mut self, log_entry: LogEntry<C>) {
+        let index = log_entry.index;
+        assert_eq!(index, self.end(), "Expecting new index to be exact at len");
+        self.inner.push(log_entry);
+        assert_eq!(
+            index + 1,
+            self.end(),
+            "Expecting len increase by one after push",
+        );
+        assert_eq!(
+            self.at(index).index,
+            index,
+            "Expecting pushed element to have the same index",
+        );
+        self.check_one_element();
+    }
+
+    /// Remove all log entries on and after `index`.
+    pub fn truncate(&mut self, index: Index) {
+        let index = self.check_middle_index(index);
+        self.inner.truncate(index);
+        self.check_one_element()
+    }
+}
+
+impl<C: Default> LogArray<C> {
+    /// Shift the start of the array to `index`, and store a new snapshot that
+    /// covers all commands before and at `index`.
+    #[allow(dead_code)]
+    pub fn shift(&mut self, index: Index, snapshot: Vec<u8>) {
+        // Discard everything before index and store the snapshot.
+        let offset = self.check_middle_index(index);
+        // WARNING: Potentially all entries after offset would be copied.
+        self.inner.drain(0..offset);
+        self.snapshot = snapshot;
+
+        // Override the first entry, we know there is at least one entry. This
+        // is not strictly needed. One benefit is that the command can be
+        // released after this point.
+        let first = self.first_index_term();
+        self.inner[0] = Self::build_first_entry(first.index, first.term);
+
+        assert_eq!(
+            first.index, index,
+            "Expecting the first entry to have the same index."
+        );
+
+        self.check_one_element()
+    }
+
+    /// Reset the array to contain only one snapshot at the given `index` with
+    /// the given `term`.
+    #[allow(dead_code)]
+    pub fn reset(
+        &mut self,
+        index: Index,
+        term: Term,
+        snapshot: Vec<u8>,
+    ) -> Vec<LogEntry<C>> {
+        let mut inner = vec![Self::build_first_entry(index, term)];
+        std::mem::swap(&mut inner, &mut self.inner);
+        self.snapshot = snapshot;
+
+        self.check_one_element();
+
+        inner
+    }
+}
+
+impl<C> LogArray<C> {
+    fn first_entry(&self) -> &LogEntry<C> {
+        self.inner
+            .first()
+            .expect("There must be at least one element in log")
+    }
+
+    fn last_entry(&self) -> &LogEntry<C> {
+        &self
+            .inner
+            .last()
+            .expect("There must be at least one entry in log")
+    }
+
+    fn offset(&self, index: Index) -> usize {
+        index - self.start()
+    }
+
+    fn check_start_index(&self, index: Index) -> usize {
+        assert!(
+            index >= self.start() && index < self.end(),
+            "Accessing start log index {} out of range [{}, {})",
+            index,
+            self.start(),
+            self.end()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_end_index(&self, index: Index) -> usize {
+        assert!(
+            index > self.start() && index <= self.end(),
+            "Accessing end log index {} out of range ({}, {}]",
+            index,
+            self.start(),
+            self.end()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_middle_index(&self, index: Index) -> usize {
+        assert!(
+            index > self.start() && index < self.end(),
+            "Log index {} out of range ({}, {})",
+            index,
+            self.start(),
+            self.end()
+        );
+
+        self.offset(index)
+    }
+
+    fn check_one_element(&self) {
+        assert!(
+            self.inner.len() >= 1,
+            "There must be at least one element in log"
+        )
+    }
+}
+
+impl<C: Default> LogArray<C> {
+    fn build_first_entry(index: Index, term: Term) -> LogEntry<C> {
+        LogEntry {
+            index,
+            term,
+            command: C::default(),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::panic::catch_unwind;
+
+    use super::*;
+
+    fn make_log_array(len: usize) -> LogArray<i32> {
+        make_log_array_range(0, len)
+    }
+
+    fn make_log_array_range(start: usize, end: usize) -> LogArray<i32> {
+        let mut ret = vec![];
+        for i in start..end {
+            ret.push(LogEntry {
+                term: Term(i / 3),
+                index: i,
+                command: (end - i) as i32,
+            })
+        }
+
+        LogArray {
+            inner: ret,
+            snapshot: vec![1, 2, 3],
+        }
+    }
+
+    fn default_log_array() -> (usize, usize, LogArray<i32>) {
+        (8, 17, make_log_array_range(8, 17))
+    }
+
+    #[test]
+    fn test_create() {
+        let log = LogArray::<i32>::create();
+        log.check_one_element();
+
+        assert_eq!(1, log.end());
+        assert_eq!((0, Term(0)), log.first_index_term().into());
+        assert_eq!(0, log[0].command);
+    }
+
+    // TODO(ditsing): add invariant checks to restore and write a test.
+    #[test]
+    fn test_restore() {}
+
+    #[test]
+    fn test_start() {
+        let log = make_log_array_range(9, 17);
+        assert_eq!(9, log.start());
+
+        let log = make_log_array(9);
+        assert_eq!(0, log.start());
+    }
+
+    #[test]
+    fn test_end() {
+        let log = make_log_array(7);
+        assert_eq!(7, log.end());
+
+        let log = make_log_array_range(8, 17);
+        assert_eq!(17, log.end());
+    }
+
+    #[test]
+    fn test_accessors() {
+        let (start, end, log) = default_log_array();
+        assert_eq!((start, Term(2)), log.first_index_term().into());
+        assert_eq!((end - 1, Term(5)), log.last_index_term().into());
+        assert_eq!(
+            ((start, Term(2)).into(), [1, 2, 3].as_ref()),
+            log.snapshot()
+        );
+
+        let all = log.all();
+        assert_eq!(end - start, all.len());
+        for i in start..end {
+            assert_eq!(all[i - start].index, i);
+        }
+    }
+
+    #[test]
+    fn test_at() {
+        let (start, end, log) = default_log_array();
+
+        let last = log.at(end - 1);
+        assert_eq!(end - 1, last.index);
+        assert_eq!(5, last.term.0);
+        assert_eq!(1, last.command);
+
+        let first = log.at(start);
+        assert_eq!(start, first.index);
+        assert_eq!(2, first.term.0);
+        assert_eq!(9, first.command);
+
+        assert!(start < 12);
+        assert!(end > 12);
+        let middle = log.at(12);
+        assert_eq!(12, middle.index);
+        assert_eq!(4, middle.term.0);
+        assert_eq!(5, middle.command);
+
+        let at_before_start = catch_unwind(|| {
+            log.at(start - 1);
+        });
+        assert!(at_before_start.is_err());
+        let at_after_end = catch_unwind(|| {
+            log.at(end);
+        });
+        assert!(at_after_end.is_err());
+    }
+
+    #[test]
+    fn test_index_operator() {
+        let (start, end, log) = default_log_array();
+
+        for i in start..end {
+            assert_eq!(log[i].index, log.at(i).index);
+            assert_eq!(log[i].term, log.at(i).term);
+            assert_eq!(log[i].command, log.at(i).command);
+        }
+        assert!(catch_unwind(|| log[0].term).is_err());
+        assert!(catch_unwind(|| log[20].term).is_err());
+    }
+
+    #[test]
+    fn test_after() {
+        let (start, end, log) = default_log_array();
+
+        let offset = 12;
+        assert!(offset > start);
+        assert!(offset < end);
+
+        let after = log.after(offset);
+        assert_eq!(end - offset, after.len());
+        for i in offset..end {
+            assert_eq!(after[i - offset].index, i);
+            assert_eq!(after[i - offset].term.0, i / 3);
+        }
+
+        assert!(catch_unwind(|| log.after(start - 1)).is_err());
+        assert!(catch_unwind(|| log.after(start)).is_ok());
+        assert!(catch_unwind(|| log.after(end)).is_err());
+        assert!(catch_unwind(|| log.after(end + 1)).is_err());
+    }
+
+    #[test]
+    fn test_between() {
+        let (start, end, log) = default_log_array();
+
+        let between = log.between(start + 2, end);
+        assert_eq!(end - start - 2, between.len());
+        for i in start + 2..end {
+            assert_eq!(between[i - start - 2].index, i);
+            assert_eq!(between[i - start - 2].term.0, i / 3);
+        }
+
+        assert!(catch_unwind(|| log.between(start - 1, end)).is_err());
+        assert!(catch_unwind(|| log.between(start + 2, end + 1)).is_err());
+        assert!(catch_unwind(|| log.between(start, end)).is_ok());
+    }
+
+    #[test]
+    fn test_add_command() {
+        let (_, end, mut log) = default_log_array();
+        let index = log.add_command(Term(8), 9);
+        assert_eq!(8, log.at(index).term.0);
+        assert_eq!(9, log.at(index).command);
+        assert_eq!(index, end);
+        assert_eq!(index + 1, log.end());
+    }
+
+    #[test]
+    fn test_push() {
+        let (_, end, mut log) = default_log_array();
+        log.push(LogEntry {
+            term: Term(8),
+            index: end,
+            command: 1,
+        });
+        assert_eq!(8, log.at(end).term.0);
+        assert_eq!(1, log.at(end).command);
+        assert_eq!(end + 1, log.end());
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_push_small_index() {
+        let (_, end, mut log) = default_log_array();
+        log.push(LogEntry {
+            term: Term(8),
+            index: end - 1,
+            command: 1,
+        });
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_push_big_index() {
+        let (_, end, mut log) = default_log_array();
+        log.push(LogEntry {
+            term: Term(8),
+            index: end + 1,
+            command: 1,
+        });
+    }
+
+    #[test]
+    fn test_truncate() {
+        let (start, _, mut log) = default_log_array();
+        log.truncate(start + 5);
+        assert_eq!(start + 5, log.end());
+        for i in start..start + 5 {
+            assert_eq!(log[i].index, i);
+            assert_eq!(log[i].term.0, i / 3);
+        }
+
+        log.truncate(start + 1);
+        assert_eq!(1, log.all().len());
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_truncate_at_start() {
+        let (start, _, mut log) = default_log_array();
+        log.truncate(start);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_truncate_at_end() {
+        let (_, end, mut log) = default_log_array();
+        log.truncate(end);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_truncate_before_start() {
+        let (start, _, mut log) = default_log_array();
+        log.truncate(start - 1);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_truncate_after_end() {
+        let (_, end, mut log) = default_log_array();
+        log.truncate(end + 1);
+    }
+
+    #[test]
+    fn test_shift() {
+        let (start, end, mut log) = default_log_array();
+        let offset = 10;
+        assert!(offset > start);
+        assert!(offset < end);
+
+        log.shift(offset, vec![]);
+
+        assert_eq!((offset, Term(3)), log.first_index_term().into());
+        assert_eq!(0, log[offset].command);
+
+        let all = log.all();
+        assert_eq!(end - offset, all.len());
+        for i in offset..end {
+            assert_eq!(i, all[i - offset].index);
+            assert_eq!(i / 3, all[i - offset].term.0);
+        }
+
+        assert_eq!(log.snapshot, vec![]);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_shift_before_start() {
+        let (start, end, mut log) = default_log_array();
+        assert!(start < end);
+        log.shift(start - 1, vec![]);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_shift_at_start() {
+        let (start, end, mut log) = default_log_array();
+        assert!(start < end);
+        log.shift(start, vec![]);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_shift_at_end() {
+        let (start, end, mut log) = default_log_array();
+        assert!(start < end);
+        log.shift(end, vec![]);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_shift_after_end() {
+        let (start, end, mut log) = default_log_array();
+        assert!(start < end);
+        log.shift(end + 1, vec![]);
+    }
+
+    #[test]
+    fn test_reset() {
+        let (start, end, mut log) = default_log_array();
+        let dump = log.reset(88, Term(99), vec![1, 2]);
+        assert_eq!(1, log.all().len());
+        assert_eq!(vec![1, 2], log.snapshot);
+        assert_eq!(88, log[88].index);
+        assert_eq!(99, log[88].term.0);
+        assert_eq!(0, log[88].command);
+
+        assert_eq!(end - start, dump.len());
+    }
+
+    #[test]
+    fn test_private_accessors() {
+        let (start, end, log) = default_log_array();
+        let first = log.first_entry();
+        assert_eq!(start, first.index);
+        assert_eq!(start / 3, first.term.0);
+
+        let last = log.last_entry();
+        assert_eq!(end - 1, last.index);
+        assert_eq!((end - 1) / 3, last.term.0);
+
+        assert_eq!(10 - start, log.offset(10));
+    }
+
+    #[test]
+    fn test_checks_start_index() {
+        let (start, end, log) = default_log_array();
+        assert!(start < end);
+        assert!(catch_unwind(|| log.check_start_index(start - 8)).is_err());
+        assert!(catch_unwind(|| log.check_start_index(start - 1)).is_err());
+        assert!(catch_unwind(|| log.check_start_index(start)).is_ok());
+        assert!(catch_unwind(|| log.check_start_index(start + 1)).is_ok());
+        assert!(catch_unwind(|| log.check_start_index(end - 1)).is_ok());
+        assert!(catch_unwind(|| log.check_start_index(end)).is_err());
+        assert!(catch_unwind(|| log.check_start_index(end + 1)).is_err());
+        assert!(catch_unwind(|| log.check_start_index(end + 5)).is_err());
+    }
+
+    #[test]
+    fn test_checks_end_index() {
+        let (start, end, log) = default_log_array();
+        assert!(start < end);
+        assert!(catch_unwind(|| log.check_end_index(start - 8)).is_err());
+        assert!(catch_unwind(|| log.check_end_index(start - 1)).is_err());
+        assert!(catch_unwind(|| log.check_end_index(start)).is_err());
+        assert!(catch_unwind(|| log.check_end_index(start + 1)).is_ok());
+        assert!(catch_unwind(|| log.check_end_index(end - 1)).is_ok());
+        assert!(catch_unwind(|| log.check_end_index(end)).is_ok());
+        assert!(catch_unwind(|| log.check_end_index(end + 1)).is_err());
+        assert!(catch_unwind(|| log.check_end_index(end + 5)).is_err());
+    }
+
+    #[test]
+    fn test_checks_middle_index() {
+        let (start, end, log) = default_log_array();
+        assert!(start < end);
+        assert!(catch_unwind(|| log.check_middle_index(start - 8)).is_err());
+        assert!(catch_unwind(|| log.check_middle_index(start - 1)).is_err());
+        assert!(catch_unwind(|| log.check_middle_index(start)).is_err());
+        assert!(catch_unwind(|| log.check_middle_index(start + 1)).is_ok());
+        assert!(catch_unwind(|| log.check_middle_index(end - 1)).is_ok());
+        assert!(catch_unwind(|| log.check_middle_index(end)).is_err());
+        assert!(catch_unwind(|| log.check_middle_index(end + 1)).is_err());
+        assert!(catch_unwind(|| log.check_middle_index(end + 5)).is_err());
+    }
+
+    #[test]
+    fn test_checks_one_element() {
+        let log = make_log_array(0);
+        assert!(catch_unwind(|| log.check_one_element()).is_err());
+    }
+
+    #[test]
+    fn test_integration_test() {
+        let mut log = make_log_array(1);
+        log.add_command(Term(3), 19);
+        log.push(LogEntry {
+            term: Term(3),
+            index: 2,
+            command: 3,
+        });
+        log.add_command(Term(4), 20);
+        log.push(LogEntry {
+            term: Term(4),
+            index: 4,
+            command: 7,
+        });
+
+        for i in 0..100 {
+            log.add_command(Term(5), i);
+        }
+        assert_eq!(0, log.start());
+        assert_eq!(105, log.end());
+
+        assert_eq!((0, Term(0)), log.first_index_term().into());
+        assert_eq!((104, Term(5)), log.last_index_term().into());
+
+        assert_eq!(8, log.at(8).index);
+        assert_eq!(5, log[8].term.0);
+        assert_eq!(7, log[4].command);
+
+        log.truncate(50);
+        // End changed, start does not.
+        assert_eq!(0, log.start());
+        assert_eq!(50, log.end());
+
+        assert_eq!((49, Term(5)), log.last_index_term().into());
+        assert_eq!(49, log.at(49).index);
+        assert_eq!(44, log[49].command);
+        assert_eq!(5, log.at(5).term.0);
+        // Cannot assert 50 is out of range. log is mut and cannot be used in
+        // catch_unwind().
+
+        // Snapshot is not changed.
+        assert_eq!(((0, Term(0)).into(), [1, 2, 3].as_ref()), log.snapshot());
+
+        log.shift(5, vec![]);
+        // Start changed, end did not;
+        assert_eq!(5, log.start());
+
+        assert_eq!((5, Term(5)), log.first_index_term().into());
+        assert_eq!(5, log.at(5).index);
+        assert_eq!(5, log.at(5).term.0);
+        // Cannot assert 4 is out of range. log is mut and cannot be used in
+        // catch_unwind().
+
+        // Snapshot is changed, too.
+        assert_eq!(((5, Term(5)).into(), [].as_ref()), log.snapshot());
+
+        // Ranged accessors.
+        assert_eq!(45, log.all().len());
+        assert_eq!(10, log.after(40).len());
+        assert_eq!(20, log.between(20, 40).len());
+
+        // Reset!
+        log.reset(9, Term(7), vec![7, 8, 9]);
+        assert_eq!(10, log.end());
+        assert_eq!(1, log.all().len());
+        assert_eq!(log.first_index_term(), log.last_index_term());
+        assert_eq!(((9, Term(7)).into(), [7, 8, 9].as_ref()), log.snapshot());
+    }
+}

+ 1 - 1
src/persister.rs

@@ -31,7 +31,7 @@ impl<Command: Clone> From<&RaftState<Command>> for PersistedRaftState<Command> {
         Self {
         Self {
             current_term: raft_state.current_term,
             current_term: raft_state.current_term,
             voted_for: raft_state.voted_for,
             voted_for: raft_state.voted_for,
-            log: raft_state.log.clone(),
+            log: raft_state.log.all().to_vec(),
         }
         }
     }
     }
 }
 }

+ 4 - 8
src/raft_state.rs

@@ -1,4 +1,6 @@
-use crate::{persister::PersistedRaftState, Index, LogEntry, Peer, Term};
+use crate::{
+    log_array::LogArray, persister::PersistedRaftState, Index, Peer, Term,
+};
 
 
 #[derive(Debug, Eq, PartialEq)]
 #[derive(Debug, Eq, PartialEq)]
 pub(crate) enum State {
 pub(crate) enum State {
@@ -11,7 +13,7 @@ pub(crate) enum State {
 pub(crate) struct RaftState<Command> {
 pub(crate) struct RaftState<Command> {
     pub current_term: Term,
     pub current_term: Term,
     pub voted_for: Option<Peer>,
     pub voted_for: Option<Peer>,
-    pub log: Vec<LogEntry<Command>>,
+    pub log: LogArray<Command>,
 
 
     pub commit_index: Index,
     pub commit_index: Index,
     pub last_applied: Index,
     pub last_applied: Index,
@@ -32,12 +34,6 @@ impl<Command: Clone> RaftState<Command> {
 }
 }
 
 
 impl<Command> RaftState<Command> {
 impl<Command> RaftState<Command> {
-    pub fn last_log_index_and_term(&self) -> (Index, Term) {
-        let len = self.log.len();
-        assert!(len > 0, "There should always be at least one entry in log");
-        (len - 1, self.log.last().unwrap().term)
-    }
-
     pub fn is_leader(&self) -> bool {
     pub fn is_leader(&self) -> bool {
         self.state == State::Leader
         self.state == State::Leader
     }
     }

+ 5 - 3
src/rpcs.rs

@@ -10,7 +10,7 @@ use crate::{
 use serde::de::DeserializeOwned;
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 use serde::Serialize;
 
 
-fn proxy_request_vote<Command: Clone + Serialize>(
+fn proxy_request_vote<Command: Clone + Serialize + Default>(
     raft: &Raft<Command>,
     raft: &Raft<Command>,
     data: RequestMessage,
     data: RequestMessage,
 ) -> ReplyMessage {
 ) -> ReplyMessage {
@@ -25,7 +25,9 @@ fn proxy_request_vote<Command: Clone + Serialize>(
     )
     )
 }
 }
 
 
-fn proxy_append_entries<Command: Clone + Serialize + DeserializeOwned>(
+fn proxy_append_entries<
+    Command: Clone + Serialize + DeserializeOwned + Default,
+>(
     raft: &Raft<Command>,
     raft: &Raft<Command>,
     data: RequestMessage,
     data: RequestMessage,
 ) -> ReplyMessage {
 ) -> ReplyMessage {
@@ -82,7 +84,7 @@ impl RpcClient {
 }
 }
 
 
 pub fn register_server<
 pub fn register_server<
-    Command: 'static + Clone + Serialize + DeserializeOwned,
+    Command: 'static + Clone + Serialize + DeserializeOwned + Default,
     S: AsRef<str>,
     S: AsRef<str>,
 >(
 >(
     raft: Arc<Raft<Command>>,
     raft: Arc<Raft<Command>>,