|
|
@@ -4,6 +4,8 @@ use std::time::Duration;
|
|
|
|
|
|
use parking_lot::{Condvar, Mutex};
|
|
|
|
|
|
+use crate::check_or_record;
|
|
|
+use crate::daemon_env::ErrorKind;
|
|
|
use crate::index_term::IndexTerm;
|
|
|
use crate::utils::{retry_rpc, RPC_DEADLINE};
|
|
|
use crate::{
|
|
|
@@ -148,14 +150,22 @@ where
|
|
|
}
|
|
|
}
|
|
|
Ok(SyncLogEntriesResult::Archived(committed)) => {
|
|
|
- if prev_log_index >= committed.index {
|
|
|
- eprintln!(
|
|
|
- "Peer {} misbehaves: send prev log index {}, got committed {:?}",
|
|
|
+ let mut rf = rf.lock();
|
|
|
+
|
|
|
+ check_or_record!(
|
|
|
+ prev_log_index < committed.index,
|
|
|
+ ErrorKind::RefusedSnapshotAfterCommitted(
|
|
|
+ prev_log_index,
|
|
|
+ committed.index
|
|
|
+ ),
|
|
|
+ format!(
|
|
|
+ "Peer {} misbehaves: claimed log index {} is archived, \
|
|
|
+ but commit index is at {:?}) which is before that",
|
|
|
peer_index, prev_log_index, committed
|
|
|
- );
|
|
|
- }
|
|
|
+ ),
|
|
|
+ &rf
|
|
|
+ );
|
|
|
|
|
|
- let mut rf = rf.lock();
|
|
|
Self::check_committed(&rf, peer, committed.clone());
|
|
|
|
|
|
rf.current_step[peer_index] = 0;
|
|
|
@@ -165,13 +175,20 @@ where
|
|
|
let _ = rerun.send(Some(Peer(peer_index)));
|
|
|
}
|
|
|
Ok(SyncLogEntriesResult::Diverged(committed)) => {
|
|
|
- if prev_log_index < committed.index {
|
|
|
- eprintln!(
|
|
|
- "Peer {} misbehaves: diverged at {}, but committed {:?}",
|
|
|
- peer_index, prev_log_index, committed
|
|
|
- );
|
|
|
- }
|
|
|
let mut rf = rf.lock();
|
|
|
+ check_or_record!(
|
|
|
+ prev_log_index > committed.index,
|
|
|
+ ErrorKind::DivergedBeforeCommitted(
|
|
|
+ prev_log_index,
|
|
|
+ committed.index
|
|
|
+ ),
|
|
|
+ format!(
|
|
|
+ "Peer {} claimed log index {} does not match, \
|
|
|
+ but commit index is at {:?}) which is after that.",
|
|
|
+ peer_index, prev_log_index, committed
|
|
|
+ ),
|
|
|
+ &rf
|
|
|
+ );
|
|
|
Self::check_committed(&rf, peer, committed.clone());
|
|
|
|
|
|
let step = &mut rf.current_step[peer_index];
|
|
|
@@ -216,12 +233,15 @@ where
|
|
|
return;
|
|
|
}
|
|
|
let local_term = rf.log.at(committed.index).term;
|
|
|
- if committed.term != local_term {
|
|
|
- eprintln!(
|
|
|
+ check_or_record!(
|
|
|
+ committed.term == local_term,
|
|
|
+ ErrorKind::DivergedAtCommitted(committed.index),
|
|
|
+ format!(
|
|
|
"{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
|
|
|
peer, committed.index, committed.term, local_term
|
|
|
- );
|
|
|
- }
|
|
|
+ ),
|
|
|
+ rf
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
fn build_sync_log_entries(
|