|
|
@@ -14,13 +14,13 @@ use crate::{
|
|
|
#[repr(align(64))]
|
|
|
struct Opening(Arc<AtomicUsize>);
|
|
|
|
|
|
-enum SyncLogEntryOperation<Command> {
|
|
|
+enum SyncLogEntriesOperation<Command> {
|
|
|
AppendEntries(AppendEntriesArgs<Command>),
|
|
|
InstallSnapshot(InstallSnapshotArgs),
|
|
|
None,
|
|
|
}
|
|
|
|
|
|
-enum SyncLogEntryResult {
|
|
|
+enum SyncLogEntriesResult {
|
|
|
TermElapsed(Term),
|
|
|
Archived(IndexTerm),
|
|
|
Diverged(IndexTerm),
|
|
|
@@ -66,7 +66,7 @@ where
|
|
|
// Only schedule a new task if the last task has cleared
|
|
|
// the queue of RPC requests.
|
|
|
if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
|
|
|
- this.thread_pool.spawn(Self::sync_log_entry(
|
|
|
+ this.thread_pool.spawn(Self::sync_log_entries(
|
|
|
this.inner_state.clone(),
|
|
|
rpc_client.clone(),
|
|
|
i,
|
|
|
@@ -87,7 +87,7 @@ where
|
|
|
self.daemon_env.watch_daemon(join_handle);
|
|
|
}
|
|
|
|
|
|
- async fn sync_log_entry(
|
|
|
+ async fn sync_log_entries(
|
|
|
rf: Arc<Mutex<RaftState<Command>>>,
|
|
|
rpc_client: Arc<RpcClient>,
|
|
|
peer_index: usize,
|
|
|
@@ -99,9 +99,9 @@ where
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- let operation = Self::build_sync_log_entry(&rf, peer_index);
|
|
|
+ let operation = Self::build_sync_log_entries(&rf, peer_index);
|
|
|
let (term, prev_log_index, match_index, succeeded) = match operation {
|
|
|
- SyncLogEntryOperation::AppendEntries(args) => {
|
|
|
+ SyncLogEntriesOperation::AppendEntries(args) => {
|
|
|
let term = args.term;
|
|
|
let prev_log_index = args.prev_log_index;
|
|
|
let match_index = args.prev_log_index + args.entries.len();
|
|
|
@@ -109,7 +109,7 @@ where
|
|
|
|
|
|
(term, prev_log_index, match_index, succeeded)
|
|
|
}
|
|
|
- SyncLogEntryOperation::InstallSnapshot(args) => {
|
|
|
+ SyncLogEntriesOperation::InstallSnapshot(args) => {
|
|
|
let term = args.term;
|
|
|
let prev_log_index = args.last_included_index;
|
|
|
let match_index = args.last_included_index;
|
|
|
@@ -117,12 +117,12 @@ where
|
|
|
|
|
|
(term, prev_log_index, match_index, succeeded)
|
|
|
}
|
|
|
- SyncLogEntryOperation::None => return,
|
|
|
+ SyncLogEntriesOperation::None => return,
|
|
|
};
|
|
|
|
|
|
let peer = Peer(peer_index);
|
|
|
match succeeded {
|
|
|
- Ok(SyncLogEntryResult::Success) => {
|
|
|
+ Ok(SyncLogEntriesResult::Success) => {
|
|
|
let mut rf = rf.lock();
|
|
|
|
|
|
if rf.current_term != term {
|
|
|
@@ -147,7 +147,7 @@ where
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- Ok(SyncLogEntryResult::Archived(committed)) => {
|
|
|
+ Ok(SyncLogEntriesResult::Archived(committed)) => {
|
|
|
if prev_log_index >= committed.index {
|
|
|
eprintln!(
|
|
|
"Peer {} misbehaves: send prev log index {}, got committed {:?}",
|
|
|
@@ -164,7 +164,7 @@ where
|
|
|
// Ignore the error. The log syncing thread must have died.
|
|
|
let _ = rerun.send(Some(Peer(peer_index)));
|
|
|
}
|
|
|
- Ok(SyncLogEntryResult::Diverged(committed)) => {
|
|
|
+ Ok(SyncLogEntriesResult::Diverged(committed)) => {
|
|
|
if prev_log_index < committed.index {
|
|
|
eprintln!(
|
|
|
"Peer {} misbehaves: diverged at {}, but committed {:?}",
|
|
|
@@ -195,7 +195,7 @@ where
|
|
|
let _ = rerun.send(Some(Peer(peer_index)));
|
|
|
}
|
|
|
// Do nothing, not our term anymore.
|
|
|
- Ok(SyncLogEntryResult::TermElapsed(_)) => {}
|
|
|
+ Ok(SyncLogEntriesResult::TermElapsed(_)) => {}
|
|
|
Err(_) => {
|
|
|
tokio::time::sleep(Duration::from_millis(
|
|
|
HEARTBEAT_INTERVAL_MILLIS,
|
|
|
@@ -224,24 +224,24 @@ where
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn build_sync_log_entry(
|
|
|
+ fn build_sync_log_entries(
|
|
|
rf: &Mutex<RaftState<Command>>,
|
|
|
peer_index: usize,
|
|
|
- ) -> SyncLogEntryOperation<Command> {
|
|
|
+ ) -> SyncLogEntriesOperation<Command> {
|
|
|
let rf = rf.lock();
|
|
|
if !rf.is_leader() {
|
|
|
- return SyncLogEntryOperation::None;
|
|
|
+ return SyncLogEntriesOperation::None;
|
|
|
}
|
|
|
|
|
|
// To send AppendEntries request, next_index must be strictly larger
|
|
|
// than start(). Otherwise we won't be able to know the log term of the
|
|
|
// entry right before next_index.
|
|
|
if rf.next_index[peer_index] > rf.log.start() {
|
|
|
- SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
|
|
|
+ SyncLogEntriesOperation::AppendEntries(Self::build_append_entries(
|
|
|
&rf, peer_index,
|
|
|
))
|
|
|
} else {
|
|
|
- SyncLogEntryOperation::InstallSnapshot(
|
|
|
+ SyncLogEntriesOperation::InstallSnapshot(
|
|
|
Self::build_install_snapshot(&rf),
|
|
|
)
|
|
|
}
|
|
|
@@ -267,7 +267,7 @@ where
|
|
|
async fn append_entries(
|
|
|
rpc_client: &RpcClient,
|
|
|
args: AppendEntriesArgs<Command>,
|
|
|
- ) -> std::io::Result<SyncLogEntryResult> {
|
|
|
+ ) -> std::io::Result<SyncLogEntriesResult> {
|
|
|
let term = args.term;
|
|
|
let reply = retry_rpc(
|
|
|
Self::APPEND_ENTRIES_RETRY,
|
|
|
@@ -278,15 +278,15 @@ where
|
|
|
Ok(if reply.term == term {
|
|
|
if let Some(committed) = reply.committed {
|
|
|
if reply.success {
|
|
|
- SyncLogEntryResult::Archived(committed)
|
|
|
+ SyncLogEntriesResult::Archived(committed)
|
|
|
} else {
|
|
|
- SyncLogEntryResult::Diverged(committed)
|
|
|
+ SyncLogEntriesResult::Diverged(committed)
|
|
|
}
|
|
|
} else {
|
|
|
- SyncLogEntryResult::Success
|
|
|
+ SyncLogEntriesResult::Success
|
|
|
}
|
|
|
} else {
|
|
|
- SyncLogEntryResult::TermElapsed(reply.term)
|
|
|
+ SyncLogEntriesResult::TermElapsed(reply.term)
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -307,7 +307,7 @@ where
|
|
|
async fn install_snapshot(
|
|
|
rpc_client: &RpcClient,
|
|
|
args: InstallSnapshotArgs,
|
|
|
- ) -> std::io::Result<SyncLogEntryResult> {
|
|
|
+ ) -> std::io::Result<SyncLogEntriesResult> {
|
|
|
let term = args.term;
|
|
|
let reply = retry_rpc(
|
|
|
Self::INSTALL_SNAPSHOT_RETRY,
|
|
|
@@ -317,12 +317,12 @@ where
|
|
|
.await?;
|
|
|
Ok(if reply.term == term {
|
|
|
if let Some(committed) = reply.committed {
|
|
|
- SyncLogEntryResult::Archived(committed)
|
|
|
+ SyncLogEntriesResult::Archived(committed)
|
|
|
} else {
|
|
|
- SyncLogEntryResult::Success
|
|
|
+ SyncLogEntriesResult::Success
|
|
|
}
|
|
|
} else {
|
|
|
- SyncLogEntryResult::TermElapsed(reply.term)
|
|
|
+ SyncLogEntriesResult::TermElapsed(reply.term)
|
|
|
})
|
|
|
}
|
|
|
}
|