|
@@ -83,7 +83,7 @@ struct RaftState {
|
|
|
leader_id: Peer,
|
|
leader_id: Peer,
|
|
|
|
|
|
|
|
// Current election cancel token, might be None if no election is running.
|
|
// Current election cancel token, might be None if no election is running.
|
|
|
- election_cancel_token: Option<tokio::sync::oneshot::Sender<Term>>,
|
|
|
|
|
|
|
+ election_cancel_token: Option<futures::channel::oneshot::Sender<Term>>,
|
|
|
// Timer will be removed upon shutdown or elected.
|
|
// Timer will be removed upon shutdown or elected.
|
|
|
election_timer: Option<tokio::time::Delay>,
|
|
election_timer: Option<tokio::time::Delay>,
|
|
|
}
|
|
}
|
|
@@ -261,10 +261,7 @@ impl Raft {
|
|
|
if let Ok(reply) = task_gen(i).await {
|
|
if let Ok(reply) = task_gen(i).await {
|
|
|
return Ok(reply);
|
|
return Ok(reply);
|
|
|
}
|
|
}
|
|
|
- tokio::time::delay_for(tokio::time::Duration::from_millis(
|
|
|
|
|
- (1 << i) * 10,
|
|
|
|
|
- ))
|
|
|
|
|
- .await;
|
|
|
|
|
|
|
+ tokio::time::delay_for(Duration::from_millis((1 << i) * 10)).await;
|
|
|
}
|
|
}
|
|
|
Err(std::io::Error::new(
|
|
Err(std::io::Error::new(
|
|
|
std::io::ErrorKind::TimedOut,
|
|
std::io::ErrorKind::TimedOut,
|
|
@@ -276,7 +273,7 @@ impl Raft {
|
|
|
let (term, last_log_index, last_log_term, cancel_token) = {
|
|
let (term, last_log_index, last_log_term, cancel_token) = {
|
|
|
let mut rf = self.inner_state.lock();
|
|
let mut rf = self.inner_state.lock();
|
|
|
|
|
|
|
|
- let (tx, rx) = tokio::sync::oneshot::channel();
|
|
|
|
|
|
|
+ let (tx, rx) = futures::channel::oneshot::channel();
|
|
|
rf.current_term.0 += 1;
|
|
rf.current_term.0 += 1;
|
|
|
|
|
|
|
|
rf.voted_for = Some(self.me);
|
|
rf.voted_for = Some(self.me);
|
|
@@ -338,14 +335,14 @@ impl Raft {
|
|
|
rf: Arc<Mutex<RaftState>>,
|
|
rf: Arc<Mutex<RaftState>>,
|
|
|
votes: Vec<impl Future<Output = Option<bool>> + Unpin>,
|
|
votes: Vec<impl Future<Output = Option<bool>> + Unpin>,
|
|
|
majority: usize,
|
|
majority: usize,
|
|
|
- cancel_token: tokio::sync::oneshot::Receiver<Term>,
|
|
|
|
|
|
|
+ cancel_token: futures::channel::oneshot::Receiver<Term>,
|
|
|
) {
|
|
) {
|
|
|
let mut vote_count = 0;
|
|
let mut vote_count = 0;
|
|
|
let mut against_count = 0;
|
|
let mut against_count = 0;
|
|
|
let mut cancel_token = cancel_token;
|
|
let mut cancel_token = cancel_token;
|
|
|
let mut futures_vec = votes;
|
|
let mut futures_vec = votes;
|
|
|
while vote_count < majority && against_count <= majority {
|
|
while vote_count < majority && against_count <= majority {
|
|
|
- // Mixing tokio futures with futures-rs ones. Fingers crossed.
|
|
|
|
|
|
|
+ // Running futures-rs futures on tokio. Fingers crossed.
|
|
|
let selected = futures::future::select(
|
|
let selected = futures::future::select(
|
|
|
cancel_token,
|
|
cancel_token,
|
|
|
futures::future::select_all(futures_vec),
|
|
futures::future::select_all(futures_vec),
|
|
@@ -397,7 +394,9 @@ const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
|
|
|
impl RaftState {
|
|
impl RaftState {
|
|
|
fn reset_election_timer(&mut self) {
|
|
fn reset_election_timer(&mut self) {
|
|
|
self.election_timer.as_mut().map(|timer| {
|
|
self.election_timer.as_mut().map(|timer| {
|
|
|
- timer.reset(tokio::time::Instant::now() + Self::election_timeout())
|
|
|
|
|
|
|
+ timer.reset(
|
|
|
|
|
+ (std::time::Instant::now() + Self::election_timeout()).into(),
|
|
|
|
|
+ )
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
|