|
|
@@ -7,7 +7,7 @@ extern crate serde_derive;
|
|
|
extern crate tokio;
|
|
|
|
|
|
use std::convert::TryFrom;
|
|
|
-use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
|
use std::sync::Arc;
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
@@ -600,6 +600,10 @@ impl Raft {
|
|
|
// Clone everything that the thread needs.
|
|
|
let this = self.clone();
|
|
|
std::thread::spawn(move || {
|
|
|
+ let mut openings = vec![];
|
|
|
+ openings.resize_with(this.peers.len(), || AtomicUsize::new(0));
|
|
|
+ let openings = Arc::new(openings); // Not mutable beyond this point.
|
|
|
+
|
|
|
while let Ok(peer) = rx.recv() {
|
|
|
if !this.keep_running.load(Ordering::SeqCst) {
|
|
|
break;
|
|
|
@@ -610,11 +614,13 @@ impl Raft {
|
|
|
for (i, rpc_client) in this.peers.iter().enumerate() {
|
|
|
if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
|
|
|
{
|
|
|
+ openings[i].fetch_add(1, Ordering::SeqCst);
|
|
|
this.thread_pool.spawn(Self::sync_log_entry(
|
|
|
this.inner_state.clone(),
|
|
|
rpc_client.clone(),
|
|
|
i,
|
|
|
this.new_log_entry.clone().unwrap(),
|
|
|
+ openings.clone(),
|
|
|
this.apply_command_signal.clone(),
|
|
|
));
|
|
|
}
|
|
|
@@ -633,8 +639,14 @@ impl Raft {
|
|
|
rpc_client: RpcClient,
|
|
|
peer_index: usize,
|
|
|
rerun: std::sync::mpsc::Sender<Option<Peer>>,
|
|
|
+ openings: Arc<Vec<AtomicUsize>>,
|
|
|
apply_command_signal: Arc<Condvar>,
|
|
|
) {
|
|
|
+ let opening = &openings[peer_index];
|
|
|
+ if opening.swap(0, Ordering::SeqCst) == 0 {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// TODO: cancel in flight changes?
|
|
|
let args = match Self::build_append_entries(&rf, peer_index) {
|
|
|
Some(args) => args,
|