Преглед на файлове

Do not shrink commit index when a snapshot is installed.

The original goal was to avoid the situation where commit index is
larger than log size. However the situation could also happen when
the leader does not send the entire log but moved commit index.

After this change we could guarantee that commit index never moves
back, and avoid the the possibility of delivering a commit at the
same index twice with different values.

The draw back is that sometimes a snapshot is not taken at the
optimal moment.
Jing Yang преди 4 години
родител
ревизия
9e1f78a179
променени са 4 файла, в които са добавени 32 реда и са изтрити 13 реда
  1. 21 8
      src/apply_command.rs
  2. 1 5
      src/install_snapshot.rs
  3. 4 0
      src/lib.rs
  4. 6 0
      src/snapshot.rs

+ 21 - 8
src/apply_command.rs

@@ -35,22 +35,35 @@ where
             while keep_running.load(Ordering::SeqCst) {
                 let messages = {
                     let mut rf = rf.lock();
-                    if rf.last_applied >= rf.commit_index {
+                    if rf.last_applied >= rf.commit_index
+                        || rf.last_applied >= rf.log.last_index_term().index
+                    {
                         condvar.wait_for(
                             &mut rf,
                             Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
                         );
                     }
                     if rf.last_applied < rf.log.start() {
-                        rf.last_applied = rf.log.start();
                         let (index_term, data) = rf.log.snapshot();
-                        vec![ApplyCommandMessage::Snapshot(Snapshot {
-                            last_included_index: index_term.index,
-                            data: data.to_vec(),
-                        })]
-                    } else if rf.last_applied < rf.commit_index {
+                        let messages =
+                            vec![ApplyCommandMessage::Snapshot(Snapshot {
+                                last_included_index: index_term.index,
+                                data: data.to_vec(),
+                            })];
+                        rf.last_applied = rf.log.start();
+                        messages
+                    } else if rf.last_applied < rf.commit_index
+                        && rf.last_applied < rf.log.last_index_term().index
+                    {
                         let index = rf.last_applied + 1;
-                        let last_one = rf.commit_index + 1;
+                        // The commit index could be larger than the total
+                        // number of log items, when we installed a snapshot
+                        // from the leader and rolled back too far beyond the
+                        // commit index. The missing log items will be appended
+                        // back by the leader, and will be identical to the
+                        // log items before rolling back.
+                        let last_one =
+                            std::cmp::min(rf.log.end(), rf.commit_index + 1);
                         let messages: Vec<ApplyCommandMessage<Command>> = rf
                             .log
                             .between(index, last_one)

+ 1 - 5
src/install_snapshot.rs

@@ -64,11 +64,7 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
                 args.data,
             );
         }
-        // The length of the log might shrink.
-        let last_log_index = rf.log.last_index_term().index;
-        if rf.commit_index > last_log_index {
-            rf.commit_index = last_log_index;
-        }
+
         self.persister.save_snapshot_and_state(
             rf.persisted_state().into(),
             rf.log.snapshot().1,

+ 4 - 0
src/lib.rs

@@ -320,6 +320,10 @@ where
                 rf.log.last_index_term().index
             };
             self.apply_command_signal.notify_one();
+        } else if rf.last_applied < rf.commit_index
+            && rf.last_applied < rf.log.end()
+        {
+            self.apply_command_signal.notify_one();
         }
 
         AppendEntriesReply {

+ 6 - 0
src/snapshot.rs

@@ -82,6 +82,12 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                     // installed in the next AppendEntries request.
                     // There is no need to retry, because when the log entries
                     // are re-committed, we will be notified again.
+
+                    // We will not be notified when the log length changes. Thus
+                    // when the log length grows to passing last_included_index
+                    // the first time, no snapshot will be taken, although
+                    // nothing is preventing it to be done. We will wait until
+                    // at least one more entry is committed.
                     continue;
                 }