|
|
@@ -201,42 +201,44 @@ where
|
|
|
Ok(SyncLogEntriesResult::Success) => {
|
|
|
let mut rf = rf.lock();
|
|
|
|
|
|
- if rf.current_term != term {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
check_or_record!(
|
|
|
match_index < rf.log.end(),
|
|
|
ErrorKind::LeaderLogShrunk(match_index),
|
|
|
"The leader log shrunk",
|
|
|
&rf
|
|
|
);
|
|
|
+
|
|
|
+ if !rf.is_leader() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if rf.current_term != term {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
rf.next_index[peer_index] = match_index + 1;
|
|
|
rf.current_step[peer_index] = 0;
|
|
|
if match_index > rf.match_index[peer_index] {
|
|
|
rf.match_index[peer_index] = match_index;
|
|
|
- if rf.is_leader() && rf.current_term == term {
|
|
|
- let mut matched = rf.match_index.to_vec();
|
|
|
- let mid = matched.len() / 2 + 1;
|
|
|
- matched.sort_unstable();
|
|
|
- let new_commit_index = matched[mid];
|
|
|
- if new_commit_index > rf.commit_index
|
|
|
- && rf.log.at(new_commit_index).term
|
|
|
- == rf.current_term
|
|
|
- {
|
|
|
- log::info!(
|
|
|
- "{:?} moving leader commit index to {} in {:?}",
|
|
|
- rf.leader_id,
|
|
|
- new_commit_index,
|
|
|
- task_number
|
|
|
- );
|
|
|
- // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
|
|
|
- // Index new_commit_index exists in the log array,
|
|
|
- // which implies new_commit_index is in range
|
|
|
- // [log.start(), log.end()).
|
|
|
- rf.commit_index = new_commit_index;
|
|
|
- apply_command_signal.notify_one();
|
|
|
- }
|
|
|
+ let mut matched = rf.match_index.to_vec();
|
|
|
+ let mid = matched.len() / 2 + 1;
|
|
|
+ matched.sort_unstable();
|
|
|
+ let new_commit_index = matched[mid];
|
|
|
+ if new_commit_index > rf.commit_index
|
|
|
+ && rf.log.at(new_commit_index).term == rf.current_term
|
|
|
+ {
|
|
|
+ log::info!(
|
|
|
+ "{:?} moving leader commit index to {} in {:?}",
|
|
|
+ rf.leader_id,
|
|
|
+ new_commit_index,
|
|
|
+ task_number
|
|
|
+ );
|
|
|
+ // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
|
|
|
+ // Index new_commit_index exists in the log array,
|
|
|
+ // which implies new_commit_index is in range
|
|
|
+ // [log.start(), log.end()).
|
|
|
+ rf.commit_index = new_commit_index;
|
|
|
+ apply_command_signal.notify_one();
|
|
|
}
|
|
|
}
|
|
|
// After each round of install snapshot, we must schedule
|
|
|
@@ -263,10 +265,7 @@ where
|
|
|
// by the previous append entries request. Thus the match index
|
|
|
// did not move. Still, we need the extra round of append
|
|
|
// entries to peer X for log entry at index 12.
|
|
|
- if rf.is_leader()
|
|
|
- && rf.current_term == term
|
|
|
- && prev_log_index == match_index
|
|
|
- {
|
|
|
+ if prev_log_index == match_index {
|
|
|
// If we did not make any progress this time, try again.
|
|
|
// This can only happen when installing snapshots.
|
|
|
let _ = rerun.send(Some(Peer(peer_index)));
|