ソースを参照

Do not shrink log beyond log start.

We never deliver a log entry to the application if it is not
committed. In addition, we never snapshot a log entry if it is not
accepted by the application.

Combine those two, we never snapshot a log entry if it is not
comitted. That is to say, the snapshot only contains committed
entries. Furthermore, a comitted entry can never be rolled back.
Thus we never need to rollback any snapshot, which is the log start.

To achive that effect, the commit index is sent back to the leader
so that the leader won't send logs beyond that point. Append entries
and install snapshot requests are still accepted if we can check the
previous log entry, even if they are before (<) the commit index. If
the entry cannot be verified (i.e. before log start), we respond with
a failure plus the commit index.

After this change, we can be sure that commit index is always within
the current log range, with the exception of newly restored logs. In
that case the commit index is simply set to the log start.

Coincidentally, a lot of changes made in the previous commit were
reverted. That does not mean they are wrong. The cases were greatly
simplified after strict log rules are imposed.
Jing Yang 4 年 前
コミット
ff9b9bcd4c
5 ファイル変更146 行追加34 行削除
  1. 3 14
      src/apply_command.rs
  2. 1 1
      src/index_term.rs
  3. 40 4
      src/install_snapshot.rs
  4. 83 14
      src/lib.rs
  5. 19 1
      src/log_array.rs

+ 3 - 14
src/apply_command.rs

@@ -35,9 +35,7 @@ where
             while keep_running.load(Ordering::SeqCst) {
                 let messages = {
                     let mut rf = rf.lock();
-                    if rf.last_applied >= rf.commit_index
-                        || rf.last_applied >= rf.log.last_index_term().index
-                    {
+                    if rf.last_applied >= rf.commit_index {
                         condvar.wait_for(
                             &mut rf,
                             Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
@@ -52,18 +50,9 @@ where
                             })];
                         rf.last_applied = rf.log.start();
                         messages
-                    } else if rf.last_applied < rf.commit_index
-                        && rf.last_applied < rf.log.last_index_term().index
-                    {
+                    } else if rf.last_applied < rf.commit_index {
                         let index = rf.last_applied + 1;
-                        // The commit index could be larger than the total
-                        // number of log items, when we installed a snapshot
-                        // from the leader and rolled back too far beyond the
-                        // commit index. The missing log items will be appended
-                        // back by the leader, and will be identical to the
-                        // log items before rolling back.
-                        let last_one =
-                            std::cmp::min(rf.log.end(), rf.commit_index + 1);
+                        let last_one = rf.commit_index + 1;
                         let messages: Vec<ApplyCommandMessage<Command>> = rf
                             .log
                             .between(index, last_one)

+ 1 - 1
src/index_term.rs

@@ -1,6 +1,6 @@
 use crate::{Index, LogEntry, Term};
 
-#[derive(Clone, Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
 pub(crate) struct IndexTerm {
     pub index: Index,
     pub term: Term,

+ 40 - 4
src/install_snapshot.rs

@@ -1,6 +1,8 @@
+use crate::index_term::IndexTerm;
 use crate::utils::retry_rpc;
 use crate::{
-    Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
+    Index, Peer, Raft, RaftState, RpcClient, State, SyncLogEntryResult, Term,
+    RPC_DEADLINE,
 };
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -18,6 +20,7 @@ pub(crate) struct InstallSnapshotArgs {
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub(crate) struct InstallSnapshotReply {
     term: Term,
+    committed: Option<IndexTerm>,
 }
 
 impl<C: Clone + Default + serde::Serialize> Raft<C> {
@@ -33,6 +36,7 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         if rf.current_term > args.term {
             return InstallSnapshotReply {
                 term: rf.current_term,
+                committed: None,
             };
         }
 
@@ -49,15 +53,36 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
 
         // The above code is exactly the same as AppendEntries.
 
+        // The snapshot could not be verified because the index is beyond log
+        // start. Fail this request and ask leader to send something that we
+        // could verify. We cannot rollback to a point beyond commit index
+        // anyway. Otherwise if the system fails right after the rollback,
+        // committed entries before log start would be lost forever.
+        //
+        // The commit index is sent back to leader. The leader would never need
+        // to rollback beyond that, since it is guaranteed that committed log
+        // entries will never be rolled back.
+        if args.last_included_index < rf.log.start() {
+            return InstallSnapshotReply {
+                term: args.term,
+                committed: Some(rf.log.first_after(rf.commit_index).into()),
+            };
+        }
+
         if args.last_included_index < rf.log.end()
             && args.last_included_index >= rf.log.start()
             && args.last_included_term == rf.log[args.last_included_index].term
         {
             // Do nothing if the index and term match the current snapshot.
             if args.last_included_index != rf.log.start() {
+                if rf.commit_index < args.last_included_index {
+                    rf.commit_index = args.last_included_index;
+                }
                 rf.log.shift(args.last_included_index, args.data);
             }
         } else {
+            assert!(args.last_included_index > rf.commit_index);
+            rf.commit_index = args.last_included_index;
             rf.log.reset(
                 args.last_included_index,
                 args.last_included_term,
@@ -71,7 +96,10 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         );
 
         self.apply_command_signal.notify_one();
-        InstallSnapshotReply { term: args.term }
+        InstallSnapshotReply {
+            term: args.term,
+            committed: None,
+        }
     }
 
     pub(crate) fn build_install_snapshot(
@@ -93,7 +121,7 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
     pub(crate) async fn send_install_snapshot(
         rpc_client: &RpcClient,
         args: InstallSnapshotArgs,
-    ) -> std::io::Result<Option<bool>> {
+    ) -> std::io::Result<SyncLogEntryResult> {
         let term = args.term;
         let reply = retry_rpc(
             Self::INSTALL_SNAPSHOT_RETRY,
@@ -101,6 +129,14 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
             move |_round| rpc_client.call_install_snapshot(args.clone()),
         )
         .await?;
-        Ok(if reply.term == term { Some(true) } else { None })
+        Ok(if reply.term == term {
+            if let Some(committed) = reply.committed {
+                SyncLogEntryResult::Archived(committed)
+            } else {
+                SyncLogEntryResult::Success
+            }
+        } else {
+            SyncLogEntryResult::TermElapsed(reply.term)
+        })
     }
 }

+ 83 - 14
src/lib.rs

@@ -18,6 +18,7 @@ use rand::{thread_rng, Rng};
 
 use crate::apply_command::ApplyCommandFnMut;
 pub use crate::apply_command::ApplyCommandMessage;
+use crate::index_term::IndexTerm;
 use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
@@ -109,6 +110,7 @@ struct AppendEntriesArgs<Command> {
 struct AppendEntriesReply {
     term: Term,
     success: bool,
+    committed: Option<IndexTerm>,
 }
 
 #[repr(align(64))]
@@ -162,6 +164,7 @@ where
             state.current_term = persisted_state.current_term;
             state.voted_for = persisted_state.voted_for;
             state.log = persisted_state.log;
+            state.commit_index = state.log.start();
         }
 
         let election = ElectionState {
@@ -275,6 +278,7 @@ where
             return AppendEntriesReply {
                 term: rf.current_term,
                 success: false,
+                committed: Some(rf.log.first_after(rf.commit_index).into()),
             };
         }
 
@@ -295,7 +299,8 @@ where
         {
             return AppendEntriesReply {
                 term: args.term,
-                success: false,
+                success: args.prev_log_index < rf.log.start(),
+                committed: Some(rf.log.first_after(rf.commit_index).into()),
             };
         }
 
@@ -303,6 +308,10 @@ where
             let index = i + args.prev_log_index + 1;
             if rf.log.end() > index {
                 if rf.log[index].term != entry.term {
+                    assert!(
+                        index > rf.commit_index,
+                        "Entries before commit index should never be rolled back"
+                    );
                     rf.log.truncate(index);
                     rf.log.push(entry.clone());
                 }
@@ -320,15 +329,12 @@ where
                 rf.log.last_index_term().index
             };
             self.apply_command_signal.notify_one();
-        } else if rf.last_applied < rf.commit_index
-            && rf.last_applied < rf.log.end()
-        {
-            self.apply_command_signal.notify_one();
         }
 
         AppendEntriesReply {
             term: args.term,
             success: true,
+            committed: None,
         }
     }
 }
@@ -339,6 +345,13 @@ enum SyncLogEntryOperation<Command> {
     None,
 }
 
+enum SyncLogEntryResult {
+    TermElapsed(Term),
+    Archived(IndexTerm),
+    Diverged(IndexTerm),
+    Success,
+}
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -705,27 +718,30 @@ where
         }
 
         let operation = Self::build_sync_log_entry(&rf, peer_index);
-        let (term, match_index, succeeded) = match operation {
+        let (term, prev_log_index, match_index, succeeded) = match operation {
             SyncLogEntryOperation::AppendEntries(args) => {
                 let term = args.term;
+                let prev_log_index = args.prev_log_index;
                 let match_index = args.prev_log_index + args.entries.len();
                 let succeeded = Self::append_entries(&rpc_client, args).await;
 
-                (term, match_index, succeeded)
+                (term, prev_log_index, match_index, succeeded)
             }
             SyncLogEntryOperation::InstallSnapshot(args) => {
                 let term = args.term;
+                let prev_log_index = args.last_included_index;
                 let match_index = args.last_included_index;
                 let succeeded =
                     Self::send_install_snapshot(&rpc_client, args).await;
 
-                (term, match_index, succeeded)
+                (term, prev_log_index, match_index, succeeded)
             }
             SyncLogEntryOperation::None => return,
         };
 
+        let peer = Peer(peer_index);
         match succeeded {
-            Ok(Some(true)) => {
+            Ok(SyncLogEntryResult::Success) => {
                 let mut rf = rf.lock();
 
                 if rf.current_term != term {
@@ -750,8 +766,32 @@ where
                     }
                 }
             }
-            Ok(Some(false)) => {
+            Ok(SyncLogEntryResult::Archived(committed)) => {
+                if prev_log_index >= committed.index {
+                    eprintln!(
+                        "Peer {} misbehaves: send prev log index {}, got committed {:?}",
+                        peer_index, prev_log_index, committed
+                    );
+                }
+
+                let mut rf = rf.lock();
+                Self::check_committed(&rf, peer, committed.clone());
+
+                rf.current_step[peer_index] = 0;
+                rf.next_index[peer_index] = committed.index;
+
+                // Ignore the error. The log syncing thread must have died.
+                let _ = rerun.send(Some(Peer(peer_index)));
+            }
+            Ok(SyncLogEntryResult::Diverged(committed)) => {
+                if prev_log_index < committed.index {
+                    eprintln!(
+                        "Peer {} misbehaves: diverged at {}, but committed {:?}",
+                        peer_index, prev_log_index, committed
+                    );
+                }
                 let mut rf = rf.lock();
+                Self::check_committed(&rf, peer, committed.clone());
 
                 let step = &mut rf.current_step[peer_index];
                 if *step < 5 {
@@ -766,11 +806,15 @@ where
                     *next_index -= diff;
                 }
 
+                if *next_index < committed.index {
+                    *next_index = committed.index;
+                }
+
                 // Ignore the error. The log syncing thread must have died.
                 let _ = rerun.send(Some(Peer(peer_index)));
             }
             // Do nothing, not our term anymore.
-            Ok(None) => {}
+            Ok(SyncLogEntryResult::TermElapsed(_)) => {}
             Err(_) => {
                 tokio::time::sleep(Duration::from_millis(
                     HEARTBEAT_INTERVAL_MILLIS,
@@ -782,6 +826,23 @@ where
         };
     }
 
+    fn check_committed(
+        rf: &RaftState<Command>,
+        peer: Peer,
+        committed: IndexTerm,
+    ) {
+        if committed.index < rf.log.start() {
+            return;
+        }
+        let local_term = rf.log.at(committed.index).term;
+        if committed.term != local_term {
+            eprintln!(
+                "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
+                peer, committed.index, committed.term, local_term
+            );
+        }
+    }
+
     fn build_sync_log_entry(
         rf: &Mutex<RaftState<Command>>,
         peer_index: usize,
@@ -825,7 +886,7 @@ where
     async fn append_entries(
         rpc_client: &RpcClient,
         args: AppendEntriesArgs<Command>,
-    ) -> std::io::Result<Option<bool>> {
+    ) -> std::io::Result<SyncLogEntryResult> {
         let term = args.term;
         let reply = retry_rpc(
             Self::APPEND_ENTRIES_RETRY,
@@ -834,9 +895,17 @@ where
         )
         .await?;
         Ok(if reply.term == term {
-            Some(reply.success)
+            if let Some(committed) = reply.committed {
+                if reply.success {
+                    SyncLogEntryResult::Archived(committed)
+                } else {
+                    SyncLogEntryResult::Diverged(committed)
+                }
+            } else {
+                SyncLogEntryResult::Success
+            }
         } else {
-            None
+            SyncLogEntryResult::TermElapsed(reply.term)
         })
     }
 

+ 19 - 1
src/log_array.rs

@@ -73,6 +73,15 @@ impl<C> LogArray<C> {
         &self.inner[index]
     }
 
+    /// The first log entry on or after the given index.
+    pub fn first_after(&self, index: Index) -> &LogEntry<C> {
+        if index >= self.start() {
+            self.at(index)
+        } else {
+            self.first_entry()
+        }
+    }
+
     /// All log entries after the given index.
     pub fn after(&self, index: Index) -> &[LogEntry<C>] {
         let index = self.check_range_index(index);
@@ -87,7 +96,7 @@ impl<C> LogArray<C> {
     }
 
     /// All log entries stored in the array.
-    #[allow(dead_code)]
+    #[cfg(test)]
     pub fn all(&self) -> &[LogEntry<C>] {
         &self.inner[..]
     }
@@ -372,6 +381,15 @@ mod tests {
         assert!(at_after_end.is_err());
     }
 
+    #[test]
+    fn test_first_after() {
+        let (start, _, log) = default_log_array();
+        assert_eq!(log.first_after(0).index, log.first_entry().index);
+        assert_eq!(log.first_after(start).index, log.at(start).index);
+        assert_eq!(log.first_after(start + 1).index, log.at(start + 1).index);
+        assert_ne!(log.first_after(0).index, log.first_after(start + 1).index);
+    }
+
     #[test]
     fn test_index_operator() {
         let (start, end, log) = default_log_array();