Преглед на файлове

Validate log content at creation time.

And panic if there is error. This is #5.
Jing Yang преди 4 години
родител
ревизия
3f0c957798
променени са 3 файла, в които са добавени 104 реда и са изтрити 1 реда
  1. 1 1
      src/daemon_env.rs
  2. 5 0
      src/lib.rs
  3. 98 0
      src/log_array.rs

+ 1 - 1
src/daemon_env.rs

@@ -174,7 +174,7 @@ impl DaemonEnv {
         StrippedRaftState {
             current_term: raft.current_term,
             voted_for: raft.voted_for,
-            log: raft.log.all().iter().map(|s| s.into()).collect(),
+            log: raft.log.all_index_term(),
             commit_index: raft.commit_index,
             last_applied: raft.last_applied,
             state: raft.state,

+ 5 - 0
src/lib.rs

@@ -178,6 +178,11 @@ where
             // snapshot must have a valid log.start() and log.end(). Thus
             // log.start() <= commit_index and commit_index < log.end() hold.
             assert!(state.commit_index < state.log.end());
+
+            state
+                .log
+                .validate(state.current_term)
+                .expect("Persisted log should not contain error");
         }
 
         let election = ElectionState::create();

+ 98 - 0
src/log_array.rs

@@ -32,6 +32,13 @@ pub(crate) struct LogArray<C> {
     snapshot: Vec<u8>,
 }
 
+#[derive(Debug)]
+pub(crate) enum ValidationError {
+    IndexMismatch(Index, Vec<IndexTerm>),
+    TermSpike(Index, Vec<IndexTerm>),
+    FutureTerm(Term, Index, Vec<IndexTerm>),
+}
+
 impl<C: Default> LogArray<C> {
     /// Create the initial Raft log with no user-supplied commands.
     pub fn create() -> LogArray<C> {
@@ -96,14 +103,46 @@ impl<C> LogArray<C> {
     }
 
     /// All log entries stored in the array.
+    #[cfg(test)]
     pub fn all(&self) -> &[LogEntry<C>] {
         &self.inner[..]
     }
 
+    /// `IndexTerm` of all log entries, without command.
+    pub fn all_index_term(&self) -> Vec<IndexTerm> {
+        self.inner.iter().map(|e| e.into()).collect()
+    }
+
     /// The snapshot before and including `start()`.
     pub fn snapshot(&self) -> (IndexTerm, &[u8]) {
         (self.first_index_term(), &self.snapshot)
     }
+
+    pub fn validate(&self, current_term: Term) -> Result<(), ValidationError> {
+        let all_index_term = self.all_index_term();
+        let (mut index, mut term) = all_index_term[0].clone().into();
+        for entry in all_index_term[1..].iter() {
+            index += 1;
+            if entry.index != index {
+                return Err(ValidationError::IndexMismatch(
+                    index,
+                    all_index_term,
+                ));
+            }
+            if entry.term < term {
+                return Err(ValidationError::TermSpike(index, all_index_term));
+            }
+            if entry.term > current_term {
+                return Err(ValidationError::FutureTerm(
+                    current_term,
+                    index,
+                    all_index_term,
+                ));
+            }
+            term = entry.term;
+        }
+        Ok(())
+    }
 }
 
 // Mutations
@@ -724,4 +763,63 @@ mod tests {
         assert_eq!(log.first_index_term(), log.last_index_term());
         assert_eq!(((9, Term(7)).into(), [7, 8, 9].as_ref()), log.snapshot());
     }
+
+    #[test]
+    fn test_validate_or_panic_current_term() {
+        let log_array = make_log_array(7);
+        let last_term = log_array.last_index_term().term.0;
+        log_array
+            .validate(Term(last_term))
+            .expect("Validation should not fail");
+        log_array
+            .validate(Term(last_term + 1))
+            .expect("Validation should not fail");
+
+        let err = log_array
+            .validate(Term(last_term - 1))
+            .expect_err("Validation should have failed");
+        assert!(matches!(
+            err,
+            ValidationError::FutureTerm(Term(_last_term), _, _)
+        ));
+    }
+
+    #[test]
+    fn test_validate_or_panic_increasing_term() {
+        let mut log_array = make_log_array(7);
+        let last_term = log_array.last_index_term().term.0;
+        log_array
+            .validate(Term(last_term + 1))
+            .expect("Validation should not fail");
+        log_array.inner[1].term = Term(last_term + 1);
+        let err = log_array
+            .validate(Term(last_term + 1))
+            .expect_err("Validation should have failed");
+        assert!(matches!(err, ValidationError::TermSpike(2, _)));
+    }
+
+    #[test]
+    fn test_validate_or_panic_increasing_index() {
+        let mut log_array = make_log_array_range(7, 10);
+        let last_term = log_array.last_index_term().term.0;
+        // OK
+        log_array.inner[1].index = 8;
+        log_array
+            .validate(Term(last_term + 1))
+            .expect("Validation should not fail");
+
+        // Not 8, error
+        log_array.inner[1].index = 7;
+        let err = log_array
+            .validate(Term(last_term + 1))
+            .expect_err("Validation should have failed");
+        assert!(matches!(err, ValidationError::IndexMismatch(8, _)));
+
+        // Not 8, error
+        log_array.inner[1].index = 9;
+        let err = log_array
+            .validate(Term(last_term + 1))
+            .expect_err("Validation should have failed");
+        assert!(matches!(err, ValidationError::IndexMismatch(8, _)));
+    }
 }