|
@@ -249,8 +249,13 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
let votes = self.spawn_request_votes(args);
|
|
let votes = self.spawn_request_votes(args);
|
|
|
let (tx, rx) = futures_channel::oneshot::channel();
|
|
let (tx, rx) = futures_channel::oneshot::channel();
|
|
|
let this = self.clone();
|
|
let this = self.clone();
|
|
|
- self.thread_pool
|
|
|
|
|
- .spawn(this.count_vote_util_cancelled(term, votes, rx));
|
|
|
|
|
|
|
+ self.thread_pool.spawn(async move {
|
|
|
|
|
+ if !Self::quorum_before_cancelled(votes, rx).await {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ this.become_leader(term);
|
|
|
|
|
+ });
|
|
|
Some(tx)
|
|
Some(tx)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -326,16 +331,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
return vote_count >= quorum;
|
|
return vote_count >= quorum;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- async fn count_vote_util_cancelled(
|
|
|
|
|
- self,
|
|
|
|
|
- term: Term,
|
|
|
|
|
- votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
|
|
|
|
|
- cancel_token: futures_channel::oneshot::Receiver<()>,
|
|
|
|
|
- ) {
|
|
|
|
|
- if !Self::quorum_before_cancelled(votes, cancel_token).await {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ fn become_leader(&self, term: Term) {
|
|
|
let mut rf = self.inner_state.lock();
|
|
let mut rf = self.inner_state.lock();
|
|
|
if rf.current_term == term && rf.state == State::Candidate {
|
|
if rf.current_term == term && rf.state == State::Candidate {
|
|
|
// We are the leader now. The election timer can be stopped.
|
|
// We are the leader now. The election timer can be stopped.
|