Эх сурвалжийг харах

Explain that commit_index is always in range [log.star(), log.end()].

The left half (log.start() <= commit_index) is referred to as
SNAPSHOT_INDEX_INVARIANT. The right half (commit_index < log.end())
is COMMIT_INDEX_INVARIANT.
Jing Yang 4 жил өмнө
parent
commit
b7c3693001

+ 3 - 0
src/daemon_env.rs

@@ -59,6 +59,9 @@ pub(crate) enum ErrorKind {
     SnapshotBeforeCommitted(usize, Term),
     /// The application sent a snapshot that contains items beyond the log end.
     SnapshotAfterLogEnd(usize),
+    /// The application sent a snapshot that contains items that are not
+    /// committed yet. Only committed items are sent to the application.
+    SnapshotNotCommitted(usize),
     /// The recipient of [`crate::InstallSnapshot`] should have been able to
     /// verify the term at index `.0` but did not. The index `.0` is after
     /// their commit index `.1`, and thus not yet committed or archived into a

+ 8 - 0
src/lib.rs

@@ -161,6 +161,10 @@ where
         let peer_size = peers.len();
         assert!(peer_size > me, "My index should be smaller than peer size.");
         let mut state = RaftState::create(peer_size, Peer(me));
+        // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: Initially
+        // commit_index = log.start() and commit_index + 1 = log.end(). Thus
+        // log.start() <= commit_index and commit_index < log.end() both hold.
+        assert_eq!(state.commit_index + 1, state.log.end());
 
         if let Ok(persisted_state) =
             PersistedRaftState::try_from(persister.read_state())
@@ -169,6 +173,10 @@ where
             state.voted_for = persisted_state.voted_for;
             state.log = persisted_state.log;
             state.commit_index = state.log.start();
+            // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: the saved
+            // snapshot must have a valid log.start() and log.end(). Thus
+            // log.start() <= commit_index and commit_index < log.end() hold.
+            assert!(state.commit_index < state.log.end());
         }
 
         let election = ElectionState::create();

+ 18 - 0
src/process_append_entries.rs

@@ -49,6 +49,8 @@ where
             };
         }
 
+        // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
+        // commit_index < log.end().
         for (i, entry) in args.entries.iter().enumerate() {
             let index = i + args.prev_log_index + 1;
             if rf.log.end() > index {
@@ -59,20 +61,36 @@ where
                         "Entries before commit index should never be rolled back",
                         &rf
                     );
+                    // COMMIT_INDEX_INVARIANT: log.end() shrinks to index. We
+                    // checked that index is strictly larger than commit_index.
+                    // The condition that commit_index < log.end() holds.
                     rf.log.truncate(index);
                     rf.log.push(entry.clone());
                 }
+                // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
+                // The condition that commit_index < log.end() holds.
             } else {
+                // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
+                // that commit_index < log.end() holds.
                 rf.log.push(entry.clone());
             }
         }
+        // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
+        // must still hold.
 
         self.persister.save_state(rf.persisted_state().into());
 
+        // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
+        // after this if-statement. log.start() <= commit_index still holds.
         if args.leader_commit > rf.commit_index {
+            // COMMIT_INDEX_INVARIANT: commit_index < log.end() still holds
+            // after this assignment.
             rf.commit_index = if args.leader_commit < rf.log.end() {
+                // COMMIT_INDEX_INVARIANT: The if-condition guarantees that
+                // leader_commit < log.end().
                 args.leader_commit
             } else {
+                // COMMIT_INDEX_INVARIANT: This is exactly log.end() - 1.
                 rf.log.last_index_term().index
             };
             self.apply_command_signal.notify_one();

+ 9 - 0
src/process_install_snapshot.rs

@@ -61,11 +61,18 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         {
             // Do nothing if the index and term match the current snapshot.
             if args.last_included_index != rf.log.start() {
+                // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays
+                // unchanged) after this if-statement. Thus log.start() <=
+                // commit_index still holds.
+                // COMMIT_INDEX_INVARIANT: The condition of the outer if-
+                // statement guarantees that last_included_index is smaller
+                // than log.end().
                 if rf.commit_index < args.last_included_index {
                     rf.commit_index = args.last_included_index;
                 }
                 rf.log.shift(args.last_included_index, args.data);
             }
+            // COMMIT_INDEX_INVARIANT: log.end() does not move.
         } else {
             check_or_record!(
                 args.last_included_index > rf.commit_index,
@@ -76,6 +83,8 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
                 "Snapshot data is inconsistent with committed log entry.",
                 &rf
             );
+            // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: After those two
+            // updates, commit_index is exactly log.start() and log.end() - 1.
             rf.commit_index = args.last_included_index;
             rf.log.reset(
                 args.last_included_index,

+ 13 - 0
src/snapshot.rs

@@ -134,7 +134,20 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                      This could happen when logs shrinks.",
                     &rf
                 );
+                check_or_record!(
+                    snapshot.last_included_index <= rf.commit_index,
+                    ErrorKind::SnapshotNotCommitted(
+                        snapshot.last_included_index
+                    ),
+                    "Snapshot contains data that is not committed. \
+                     This should never happen.",
+                    &rf
+                );
 
+                // SNAPSHOT_INDEX_INVARIANT: log.start() is shifted to
+                // last_included_index. We checked that last_included_index is
+                // smaller than commit_index. This is the only place where
+                // log.start() changes.
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
                 persister.save_snapshot_and_state(
                     rf.persisted_state().into(),

+ 4 - 0
src/sync_log_entries.rs

@@ -143,6 +143,10 @@ where
                         if new_commit_index > rf.commit_index
                             && rf.log[new_commit_index].term == rf.current_term
                         {
+                            // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
+                            // Index new_commit_index exists in the log array,
+                            // which implies new_commit_index is in range
+                            // [log.start(), log.end()).
                             rf.commit_index = new_commit_index;
                             apply_command_signal.notify_one();
                         }