|
@@ -458,48 +458,12 @@ impl Raft {
|
|
|
while let Ok(peer) = rx.recv() {
|
|
while let Ok(peer) = rx.recv() {
|
|
|
for (i, rpc_client) in peers.iter().enumerate() {
|
|
for (i, rpc_client) in peers.iter().enumerate() {
|
|
|
if i != me.0 && peer.map(|p| p.0 == i).unwrap_or(true) {
|
|
if i != me.0 && peer.map(|p| p.0 == i).unwrap_or(true) {
|
|
|
- let rf = rf.clone();
|
|
|
|
|
- let rpc_client = rpc_client.clone();
|
|
|
|
|
- let rerun = rerun.clone();
|
|
|
|
|
- let peer_index = i;
|
|
|
|
|
- tokio::spawn(async move {
|
|
|
|
|
- // TODO: cancel in flight changes?
|
|
|
|
|
- let args =
|
|
|
|
|
- Self::build_append_entries(&rf, peer_index);
|
|
|
|
|
- let succeeded =
|
|
|
|
|
- Self::append_entries(rpc_client, args).await;
|
|
|
|
|
- match succeeded {
|
|
|
|
|
- Ok(done) => {
|
|
|
|
|
- if !done {
|
|
|
|
|
- let mut rf = rf.lock();
|
|
|
|
|
-
|
|
|
|
|
- let step =
|
|
|
|
|
- &mut rf.current_step[peer_index];
|
|
|
|
|
- *step += 1;
|
|
|
|
|
- let diff = (1 << 8) << *step;
|
|
|
|
|
-
|
|
|
|
|
- let next_index =
|
|
|
|
|
- &mut rf.next_index[peer_index];
|
|
|
|
|
- if diff >= *next_index {
|
|
|
|
|
- *next_index = 1usize;
|
|
|
|
|
- } else {
|
|
|
|
|
- *next_index -= diff;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- rerun.send(Some(Peer(peer_index)));
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- Err(_) => {
|
|
|
|
|
- tokio::time::delay_for(
|
|
|
|
|
- Duration::from_millis(
|
|
|
|
|
- HEARTBEAT_INTERVAL_MILLIS,
|
|
|
|
|
- ),
|
|
|
|
|
- )
|
|
|
|
|
- .await;
|
|
|
|
|
- rerun.send(Some(Peer(peer_index)));
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ tokio::spawn(Self::sync_log_entry(
|
|
|
|
|
+ rf.clone(),
|
|
|
|
|
+ rpc_client.clone(),
|
|
|
|
|
+ i,
|
|
|
|
|
+ rerun.clone(),
|
|
|
|
|
+ ));
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -508,6 +472,44 @@ impl Raft {
|
|
|
(handle, tx)
|
|
(handle, tx)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ async fn sync_log_entry(
|
|
|
|
|
+ rf: Arc<Mutex<RaftState>>,
|
|
|
|
|
+ rpc_client: RpcClient,
|
|
|
|
|
+ peer_index: usize,
|
|
|
|
|
+ rerun: std::sync::mpsc::Sender<Option<Peer>>,
|
|
|
|
|
+ ) {
|
|
|
|
|
+ // TODO: cancel in flight changes?
|
|
|
|
|
+ let args = Self::build_append_entries(&rf, peer_index);
|
|
|
|
|
+ let succeeded = Self::append_entries(rpc_client, args).await;
|
|
|
|
|
+ match succeeded {
|
|
|
|
|
+ Ok(done) => {
|
|
|
|
|
+ if !done {
|
|
|
|
|
+ let mut rf = rf.lock();
|
|
|
|
|
+
|
|
|
|
|
+ let step = &mut rf.current_step[peer_index];
|
|
|
|
|
+ *step += 1;
|
|
|
|
|
+ let diff = (1 << 8) << *step;
|
|
|
|
|
+
|
|
|
|
|
+ let next_index = &mut rf.next_index[peer_index];
|
|
|
|
|
+ if diff >= *next_index {
|
|
|
|
|
+ *next_index = 1usize;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ *next_index -= diff;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ rerun.send(Some(Peer(peer_index)));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(_) => {
|
|
|
|
|
+ tokio::time::delay_for(Duration::from_millis(
|
|
|
|
|
+ HEARTBEAT_INTERVAL_MILLIS,
|
|
|
|
|
+ ))
|
|
|
|
|
+ .await;
|
|
|
|
|
+ rerun.send(Some(Peer(peer_index)));
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
fn build_append_entries(
|
|
fn build_append_entries(
|
|
|
rf: &Arc<Mutex<RaftState>>,
|
|
rf: &Arc<Mutex<RaftState>>,
|
|
|
peer_index: usize,
|
|
peer_index: usize,
|