|
|
@@ -15,6 +15,7 @@ use rand::{thread_rng, Rng};
|
|
|
|
|
|
pub use crate::rpcs::RpcClient;
|
|
|
use crate::utils::{retry_rpc, DropGuard};
|
|
|
+use crossbeam_utils::sync::WaitGroup;
|
|
|
|
|
|
pub mod rpcs;
|
|
|
mod utils;
|
|
|
@@ -85,7 +86,7 @@ pub struct Raft {
|
|
|
|
|
|
thread_pool: Arc<tokio::runtime::Runtime>,
|
|
|
|
|
|
- stop_barrier: Arc<std::sync::Barrier>,
|
|
|
+ stop_wait_group: WaitGroup,
|
|
|
}
|
|
|
|
|
|
#[derive(Clone, Serialize, Deserialize)]
|
|
|
@@ -159,7 +160,6 @@ impl Raft {
|
|
|
.max_threads(peer_size * 2)
|
|
|
.build()
|
|
|
.expect("Creating thread pool should not fail");
|
|
|
- const RUNNING_THREADS: usize = 4;
|
|
|
let mut this = Raft {
|
|
|
inner_state: Arc::new(Mutex::new(state)),
|
|
|
peers,
|
|
|
@@ -169,7 +169,7 @@ impl Raft {
|
|
|
keep_running: Arc::new(Default::default()),
|
|
|
election: Arc::new(election),
|
|
|
thread_pool: Arc::new(thread_pool),
|
|
|
- stop_barrier: Arc::new(std::sync::Barrier::new(RUNNING_THREADS)),
|
|
|
+ stop_wait_group: WaitGroup::new(),
|
|
|
};
|
|
|
|
|
|
// TODO: read persist.
|
|
|
@@ -360,9 +360,10 @@ impl Raft {
|
|
|
cancel_handle.map(|c| c.send(()));
|
|
|
}
|
|
|
|
|
|
- let stop_barrier = this.stop_barrier.clone();
|
|
|
- drop(this);
|
|
|
- stop_barrier.wait();
|
|
|
+ // `this` is dropped right here. We cannot drop(this) anymore.
|
|
|
+ let stop_wait_group = this.stop_wait_group;
|
|
|
+ // Making sure the rest of `this` is dropped before the wait group.
|
|
|
+ drop(stop_wait_group);
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -594,9 +595,10 @@ impl Raft {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let stop_barrier = this.stop_barrier.clone();
|
|
|
- drop(this);
|
|
|
- stop_barrier.wait();
|
|
|
+ // `this` is dropped right here. We cannot drop(this) anymore.
|
|
|
+ let stop_wait_group = this.stop_wait_group;
|
|
|
+ // Making sure the rest of `this` is dropped before the wait group.
|
|
|
+ drop(stop_wait_group);
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -710,7 +712,7 @@ impl Raft {
|
|
|
let keep_running = self.keep_running.clone();
|
|
|
let rf = self.inner_state.clone();
|
|
|
let condvar = self.apply_command_signal.clone();
|
|
|
- let stop_barrier = self.stop_barrier.clone();
|
|
|
+ let stop_wait_group = self.stop_wait_group.clone();
|
|
|
std::thread::spawn(move || {
|
|
|
while keep_running.load(Ordering::SeqCst) {
|
|
|
let (mut index, commands) = {
|
|
|
@@ -741,7 +743,7 @@ impl Raft {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- stop_barrier.wait();
|
|
|
+ drop(stop_wait_group);
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -774,7 +776,7 @@ impl Raft {
|
|
|
self.election.stop_election_timer();
|
|
|
self.new_log_entry.take().map(|n| n.send(None));
|
|
|
self.apply_command_signal.notify_all();
|
|
|
- self.stop_barrier.wait();
|
|
|
+ self.stop_wait_group.wait();
|
|
|
std::sync::Arc::try_unwrap(self.thread_pool)
|
|
|
.expect(
|
|
|
"All references to the thread pool should have been dropped.",
|