Ver código fonte

Replace two more asserts with check_or_record.

Jing Yang 4 anos atrás
pai
commit
63d9f1e824
3 arquivos alterados com 28 adições e 2 exclusões
  1. 2 0
      src/daemon_env.rs
  2. 12 1
      src/install_snapshot.rs
  3. 14 1
      src/snapshot.rs

+ 2 - 0
src/daemon_env.rs

@@ -41,6 +41,8 @@ pub(crate) struct Error {
 #[derive(Debug)]
 pub(crate) enum ErrorKind {
     RollbackCommitted(usize),
+    SnapshotBeforeCommitted(usize, Term),
+    SnapshotAfterLogEnd(usize),
 }
 
 impl DaemonEnv {

+ 12 - 1
src/install_snapshot.rs

@@ -1,3 +1,5 @@
+use crate::check_or_record;
+use crate::daemon_env::ErrorKind;
 use crate::index_term::IndexTerm;
 use crate::utils::retry_rpc;
 use crate::{
@@ -81,7 +83,16 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
                 rf.log.shift(args.last_included_index, args.data);
             }
         } else {
-            assert!(args.last_included_index > rf.commit_index);
+            check_or_record!(
+                self.daemon_env,
+                args.last_included_index > rf.commit_index,
+                ErrorKind::SnapshotBeforeCommitted(
+                    args.last_included_index,
+                    args.last_included_term
+                ),
+                "Snapshot data is inconsistent with committed log entry.",
+                &rf
+            );
             rf.commit_index = args.last_included_index;
             rf.log.reset(
                 args.last_included_index,

+ 14 - 1
src/snapshot.rs

@@ -4,6 +4,8 @@ use std::sync::Arc;
 use crossbeam_utils::sync::{Parker, Unparker};
 use parking_lot::{Condvar, Mutex};
 
+use crate::check_or_record;
+use crate::daemon_env::ErrorKind;
 use crate::{Index, Raft};
 
 #[derive(Clone, Debug, Default)]
@@ -78,6 +80,7 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
         let rf = self.inner_state.clone();
         let persister = self.persister.clone();
         let snapshot_daemon = self.snapshot_daemon.clone();
+        let daemon_env = self.daemon_env.clone();
         let stop_wait_group = self.stop_wait_group.clone();
 
         let join_handle = std::thread::spawn(move || loop {
@@ -88,6 +91,7 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                 drop(rf);
                 drop(persister);
                 drop(snapshot_daemon);
+                drop(daemon_env);
                 drop(stop_wait_group);
                 break;
             }
@@ -118,7 +122,16 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                     continue;
                 }
 
-                assert!(snapshot.last_included_index < rf.log.end());
+                check_or_record!(
+                    daemon_env,
+                    snapshot.last_included_index < rf.log.end(),
+                    ErrorKind::SnapshotAfterLogEnd(
+                        snapshot.last_included_index,
+                    ),
+                    "Snapshot contains data that is not in the log. \
+                     This could happen when logs shrinks.",
+                    &rf
+                );
 
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
                 persister.save_snapshot_and_state(