|
|
@@ -1,18 +1,12 @@
|
|
|
use std::sync::atomic::Ordering;
|
|
|
-use std::sync::Arc;
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
use parking_lot::{Condvar, Mutex};
|
|
|
use rand::{thread_rng, Rng};
|
|
|
|
|
|
use crate::remote_context::RemoteContext;
|
|
|
-use crate::sync_log_entries::SyncLogEntriesComms;
|
|
|
use crate::utils::{retry_rpc, RPC_DEADLINE};
|
|
|
-use crate::verify_authority::VerifyAuthorityDaemon;
|
|
|
-use crate::{
|
|
|
- Peer, Persister, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
|
|
|
- State, Term,
|
|
|
-};
|
|
|
+use crate::{Peer, Raft, ReplicableCommand, RequestVoteArgs, State, Term};
|
|
|
|
|
|
struct VersionedDeadline {
|
|
|
version: usize,
|
|
|
@@ -254,17 +248,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
|
|
|
let votes = self.spawn_request_votes(args);
|
|
|
let (tx, rx) = futures_channel::oneshot::channel();
|
|
|
- self.thread_pool.spawn(Self::count_vote_util_cancelled(
|
|
|
- me,
|
|
|
- term,
|
|
|
- self.inner_state.clone(),
|
|
|
- votes,
|
|
|
- rx,
|
|
|
- self.election.clone(),
|
|
|
- self.sync_log_entries_comms.clone(),
|
|
|
- self.verify_authority_daemon.clone(),
|
|
|
- self.persister.clone(),
|
|
|
- ));
|
|
|
+ let this = self.clone();
|
|
|
+ self.thread_pool
|
|
|
+ .spawn(this.count_vote_util_cancelled(term, votes, rx));
|
|
|
Some(tx)
|
|
|
}
|
|
|
|
|
|
@@ -340,36 +326,30 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
return vote_count >= quorum;
|
|
|
}
|
|
|
|
|
|
- #[allow(clippy::too_many_arguments)]
|
|
|
async fn count_vote_util_cancelled(
|
|
|
- me: Peer,
|
|
|
+ self,
|
|
|
term: Term,
|
|
|
- rf: Arc<Mutex<RaftState<Command>>>,
|
|
|
votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
|
|
|
cancel_token: futures_channel::oneshot::Receiver<()>,
|
|
|
- election: Arc<ElectionState>,
|
|
|
- new_log_entry: SyncLogEntriesComms,
|
|
|
- verify_authority_daemon: VerifyAuthorityDaemon,
|
|
|
- persister: Arc<dyn Persister>,
|
|
|
) {
|
|
|
if !Self::quorum_before_cancelled(votes, cancel_token).await {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- let mut rf = rf.lock();
|
|
|
+ let mut rf = self.inner_state.lock();
|
|
|
if rf.current_term == term && rf.state == State::Candidate {
|
|
|
// We are the leader now. The election timer can be stopped.
|
|
|
- election.stop_election_timer();
|
|
|
+ self.election.stop_election_timer();
|
|
|
|
|
|
rf.state = State::Leader;
|
|
|
- rf.leader_id = me;
|
|
|
+ rf.leader_id = self.me;
|
|
|
rf.match_index.fill(0);
|
|
|
|
|
|
let sentinel_commit_index;
|
|
|
if rf.commit_index != rf.log.last_index_term().index {
|
|
|
// Create a sentinel commit at the start of the term.
|
|
|
sentinel_commit_index = rf.log.add_term_change_entry(term);
|
|
|
- persister.save_state(rf.persisted_state().into());
|
|
|
+ self.persister.save_state(rf.persisted_state().into());
|
|
|
} else {
|
|
|
sentinel_commit_index = rf.commit_index;
|
|
|
}
|
|
|
@@ -379,10 +359,12 @@ impl<Command: ReplicableCommand> Raft<Command> {
|
|
|
// authority.
|
|
|
// No verity authority request can go through before the reset is
|
|
|
// done, since we are holding the raft lock.
|
|
|
- verify_authority_daemon.reset_state(term, sentinel_commit_index);
|
|
|
+ self.verify_authority_daemon
|
|
|
+ .reset_state(term, sentinel_commit_index);
|
|
|
|
|
|
// Sync all logs now.
|
|
|
- new_log_entry.reset_progress(term, sentinel_commit_index);
|
|
|
+ self.sync_log_entries_comms
|
|
|
+ .reset_progress(term, sentinel_commit_index);
|
|
|
}
|
|
|
}
|
|
|
}
|