Jelajahi Sumber

Schedule an extra round of append entries after install snapshot.

Install snapshot does not guarantee the target peer will be up-to-date.
Jing Yang 4 tahun lalu
induk
melakukan
7f9e8e72f4
2 mengubah file dengan 65 tambahan dan 2 penghapusan
  1. 63 1
      src/sync_log_entries.rs
  2. 2 1
      test_configs/src/kvraft/generic_test.rs

+ 63 - 1
src/sync_log_entries.rs

@@ -30,6 +30,9 @@ enum SyncLogEntriesResult {
     Success,
 }
 
+#[derive(Clone, Copy, Debug)]
+struct TaskNumber(usize);
+
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -75,6 +78,7 @@ where
             });
             let openings = openings; // Not mutable beyond this point.
 
+            let mut task_number = 0;
             while let Ok(peer) = rx.recv() {
                 if !this.keep_running.load(Ordering::SeqCst) {
                     break;
@@ -88,6 +92,7 @@ where
                         // Only schedule a new task if the last task has cleared
                         // the queue of RPC requests.
                         if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
+                            task_number += 1;
                             this.thread_pool.spawn(Self::sync_log_entries(
                                 this.inner_state.clone(),
                                 rpc_client.clone(),
@@ -96,6 +101,7 @@ where
                                 openings[i].0.clone(),
                                 this.apply_command_signal.clone(),
                                 this.term_marker(),
+                                TaskNumber(task_number),
                             ));
                         }
                     }
@@ -153,6 +159,7 @@ where
     /// failure of the last case, we will never hit the other failure again,
     /// since in the last case we always sync log entry at a committed index,
     /// and a committed log entry can never diverge.
+    #[allow(clippy::too_many_arguments)]
     async fn sync_log_entries(
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
@@ -161,12 +168,14 @@ where
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,
+        task_number: TaskNumber,
     ) {
         if opening.swap(0, Ordering::SeqCst) == 0 {
             return;
         }
 
-        let operation = Self::build_sync_log_entries(&rf, peer_index);
+        let operation =
+            Self::build_sync_log_entries(&rf, peer_index, task_number);
         let (term, prev_log_index, match_index, succeeded) = match operation {
             SyncLogEntriesOperation::AppendEntries(args) => {
                 let term = args.term;
@@ -215,6 +224,12 @@ where
                             && rf.log.at(new_commit_index).term
                                 == rf.current_term
                         {
+                            log::info!(
+                                "{:?} moving leader commit index to {} in {:?}",
+                                rf.leader_id,
+                                new_commit_index,
+                                task_number
+                            );
                             // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
                             // Index new_commit_index exists in the log array,
                             // which implies new_commit_index is in range
@@ -224,6 +239,38 @@ where
                         }
                     }
                 }
+                // After each round of install snapshot, we must schedule
+                // another round of append entries. The extra round must be run
+                // even if match_index did not move after the snapshot is
+                // installed.
+
+                // For example,
+                // 1. Leader committed index 10, received another request at
+                //    index 11.
+                // 2. Leader sends append entries to all peers.
+                // 3. Leader commits index 11. At this time, append entries to
+                //    peer X has not returned, while other append entries have.
+                // 4. Leader snapshots index 11, received another request at
+                //    index 12.
+                // 5. Leader needs to update peer X, but does not have the
+                //    commit at index 11 any more. Leader then sends install
+                //    snapshot to peer X at index 11.
+                // 6. The original append entries request to peer X returns
+                //    successfully, moving match_index to 11.
+                // 7. The install snapshot request returns successfully.
+                //
+                // The installed snapshot is at index 11, which is already sent
+                // by the previous append entries request. Thus the match index
+                // did not move. Still, we need the extra round of append
+                // entries to peer X for log entry at index 12.
+                if rf.is_leader()
+                    && rf.current_term == term
+                    && prev_log_index == match_index
+                {
+                    // If we did not make any progress this time, try again.
+                    // This can only happen when installing snapshots.
+                    let _ = rerun.send(Some(Peer(peer_index)));
+                }
             }
             Ok(SyncLogEntriesResult::Archived(committed)) => {
                 let mut rf = rf.lock();
@@ -340,6 +387,7 @@ where
     fn build_sync_log_entries(
         rf: &Mutex<RaftState<Command>>,
         peer_index: usize,
+        task_number: TaskNumber,
     ) -> SyncLogEntriesOperation<Command> {
         let rf = rf.lock();
         if !rf.is_leader() {
@@ -350,10 +398,24 @@ where
         // than start(). Otherwise we won't be able to know the log term of the
         // entry right before next_index.
         if rf.next_index[peer_index] > rf.log.start() {
+            log::debug!(
+                "{:?} building append entries {:?} from {} to {:?}",
+                rf.leader_id,
+                task_number,
+                rf.next_index[peer_index] - 1,
+                Peer(peer_index)
+            );
             SyncLogEntriesOperation::AppendEntries(Self::build_append_entries(
                 &rf, peer_index,
             ))
         } else {
+            log::debug!(
+                "{:?} installing snapshot {:?} at {} to {:?}",
+                rf.leader_id,
+                task_number,
+                rf.log.first_index_term().index,
+                Peer(peer_index)
+            );
             SyncLogEntriesOperation::InstallSnapshot(
                 Self::build_install_snapshot(&rf),
             )

+ 2 - 1
test_configs/src/kvraft/generic_test.rs

@@ -48,8 +48,8 @@ fn appending_client(
     clerk.put(&key, &last);
 
     while !stop.load(Ordering::Acquire) {
-        log::debug!("client {} starting {}.", index, op_count);
         if rng.gen_ratio(1, 2) {
+            log::debug!("client {} appending {}.", index, op_count);
             let value = format!("({}, {}), ", index, op_count);
 
             last.push_str(&value);
@@ -57,6 +57,7 @@ fn appending_client(
 
             op_count += 1;
         } else {
+            log::debug!("client {} getting {}.", index, op_count);
             let value = clerk
                 .get(&key)
                 .unwrap_or_else(|| panic!("Key {} should exist.", index));