|
|
@@ -249,7 +249,8 @@ impl Raft {
|
|
|
}
|
|
|
|
|
|
fn run_election(&self) {
|
|
|
- let (term, last_log_index, last_log_term, cancel_token) = {
|
|
|
+ let me = self.me;
|
|
|
+ let (term, args, cancel_token) = {
|
|
|
let mut rf = self.inner_state.lock();
|
|
|
|
|
|
let (tx, rx) = futures::channel::oneshot::channel();
|
|
|
@@ -264,16 +265,21 @@ impl Raft {
|
|
|
|
|
|
rf.persist();
|
|
|
|
|
|
+ let term = rf.current_term;
|
|
|
+ let (last_log_index, last_log_term) = rf.last_log_index_and_term();
|
|
|
+
|
|
|
(
|
|
|
- rf.current_term,
|
|
|
- rf.log.len() - 1,
|
|
|
- rf.log.last().unwrap().term,
|
|
|
+ term,
|
|
|
+ RequestVoteArgs {
|
|
|
+ term,
|
|
|
+ candidate_id: me,
|
|
|
+ last_log_index,
|
|
|
+ last_log_term,
|
|
|
+ },
|
|
|
rx,
|
|
|
)
|
|
|
};
|
|
|
|
|
|
- let me = self.me;
|
|
|
-
|
|
|
let mut votes = vec![];
|
|
|
for (index, rpc_client) in self.peers.iter().enumerate() {
|
|
|
if index != self.me.0 {
|
|
|
@@ -284,9 +290,7 @@ impl Raft {
|
|
|
let one_vote = tokio::spawn(Self::request_one_vote(
|
|
|
rpc_client,
|
|
|
term,
|
|
|
- me,
|
|
|
- last_log_index,
|
|
|
- last_log_term,
|
|
|
+ args.clone(),
|
|
|
));
|
|
|
// Futures must be pinned so that they have Unpin, as required
|
|
|
// by futures::future::select.
|
|
|
@@ -303,20 +307,14 @@ impl Raft {
|
|
|
));
|
|
|
}
|
|
|
|
|
|
+ const REQUEST_VOTE_RETRY: usize = 4;
|
|
|
async fn request_one_vote(
|
|
|
rpc_client: RpcClient,
|
|
|
term: Term,
|
|
|
- me: Peer,
|
|
|
- last_log_index: usize,
|
|
|
- last_log_term: Term,
|
|
|
+ args: RequestVoteArgs,
|
|
|
) -> Option<bool> {
|
|
|
- let reply = retry_rpc(4, move |_round| {
|
|
|
- rpc_client.clone().call_request_vote(RequestVoteArgs {
|
|
|
- term,
|
|
|
- candidate_id: me,
|
|
|
- last_log_index,
|
|
|
- last_log_term,
|
|
|
- })
|
|
|
+ let reply = retry_rpc(Self::REQUEST_VOTE_RETRY, move |_round| {
|
|
|
+ rpc_client.clone().call_request_vote(args.clone())
|
|
|
})
|
|
|
.await;
|
|
|
if let Ok(reply) = reply {
|