|
|
@@ -660,47 +660,45 @@ impl Raft {
|
|
|
let match_index = args.prev_log_index + args.entries.len();
|
|
|
let succeeded = Self::append_entries(rpc_client, args).await;
|
|
|
match succeeded {
|
|
|
- Ok(Some(succeeded)) => {
|
|
|
- if succeeded {
|
|
|
- let mut rf = rf.lock();
|
|
|
- rf.next_index[peer_index] = match_index + 1;
|
|
|
- rf.current_step[peer_index] = 0;
|
|
|
- if match_index > rf.match_index[peer_index] {
|
|
|
- rf.match_index[peer_index] = match_index;
|
|
|
- if rf.is_leader() && rf.current_term == term {
|
|
|
- let mut matched = rf.match_index.to_vec();
|
|
|
- let mid = matched.len() / 2 + 1;
|
|
|
- matched.sort();
|
|
|
- let new_commit_index = matched[mid];
|
|
|
- if new_commit_index > rf.commit_index
|
|
|
- && rf.log[new_commit_index].term
|
|
|
- == rf.current_term
|
|
|
- {
|
|
|
- rf.commit_index = new_commit_index;
|
|
|
- apply_command_signal.notify_one();
|
|
|
- }
|
|
|
+ Ok(Some(true)) => {
|
|
|
+ let mut rf = rf.lock();
|
|
|
+ rf.next_index[peer_index] = match_index + 1;
|
|
|
+ rf.current_step[peer_index] = 0;
|
|
|
+ if match_index > rf.match_index[peer_index] {
|
|
|
+ rf.match_index[peer_index] = match_index;
|
|
|
+ if rf.is_leader() && rf.current_term == term {
|
|
|
+ let mut matched = rf.match_index.to_vec();
|
|
|
+ let mid = matched.len() / 2 + 1;
|
|
|
+ matched.sort();
|
|
|
+ let new_commit_index = matched[mid];
|
|
|
+ if new_commit_index > rf.commit_index
|
|
|
+ && rf.log[new_commit_index].term == rf.current_term
|
|
|
+ {
|
|
|
+ rf.commit_index = new_commit_index;
|
|
|
+ apply_command_signal.notify_one();
|
|
|
}
|
|
|
}
|
|
|
- } else {
|
|
|
- let mut rf = rf.lock();
|
|
|
-
|
|
|
- let step = &mut rf.current_step[peer_index];
|
|
|
- if *step < 5 {
|
|
|
- *step += 1;
|
|
|
- }
|
|
|
- let diff = (1 << 8) << *step;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Ok(Some(false)) => {
|
|
|
+ let mut rf = rf.lock();
|
|
|
|
|
|
- let next_index = &mut rf.next_index[peer_index];
|
|
|
- if diff >= *next_index {
|
|
|
- *next_index = 1usize;
|
|
|
- } else {
|
|
|
- *next_index -= diff;
|
|
|
- }
|
|
|
+ let step = &mut rf.current_step[peer_index];
|
|
|
+ if *step < 5 {
|
|
|
+ *step += 1;
|
|
|
+ }
|
|
|
+ let diff = (1 << 8) << *step;
|
|
|
|
|
|
- rerun
|
|
|
- .send(Some(Peer(peer_index)))
|
|
|
- .expect("Triggering log entry syncing should not fail");
|
|
|
+ let next_index = &mut rf.next_index[peer_index];
|
|
|
+ if diff >= *next_index {
|
|
|
+ *next_index = 1usize;
|
|
|
+ } else {
|
|
|
+ *next_index -= diff;
|
|
|
}
|
|
|
+
|
|
|
+ rerun
|
|
|
+ .send(Some(Peer(peer_index)))
|
|
|
+ .expect("Triggering log entry syncing should not fail");
|
|
|
}
|
|
|
// Do nothing, not our term anymore.
|
|
|
Ok(None) => {}
|