Jing Yang 5 лет назад
Родитель
Сommit
edfdc2ff65
2 измененных файлов с 34 добавлено и 6 удалено
  1. 1 0
      Cargo.toml
  2. 33 6
      src/lib.rs

+ 1 - 0
Cargo.toml

@@ -10,6 +10,7 @@ edition = "2018"
 bincode = "1.3.1"
 labrpc = { path = "../labrpc" }
 parking_lot = "0.11.0"
+rand = "0.7.3"
 serde = "1.0.116"
 serde_derive = "1.0.116"
 tokio = { version = "0.2.22", features = ["rt-threaded", "time"] }

+ 33 - 6
src/lib.rs

@@ -2,13 +2,16 @@
 
 extern crate bincode;
 extern crate labrpc;
+extern crate rand;
 #[macro_use]
 extern crate serde_derive;
 extern crate tokio;
 
 use crate::rpcs::RpcClient;
 use parking_lot::{Condvar, Mutex};
+use rand::{thread_rng, Rng};
 use std::sync::atomic::AtomicBool;
+use tokio::time::Duration;
 
 pub mod rpcs;
 
@@ -70,7 +73,8 @@ struct RaftState {
     state: State,
 
     leader_id: Peer,
-    // election_timer: timer,
+    // Timer will be removed upon shutdown.
+    election_timer: Option<tokio::time::Delay>,
 }
 
 #[derive(Default)]
@@ -150,8 +154,8 @@ impl Raft {
             rf.voted_for = None;
             rf.state = State::Follower;
             // TODO: quit current election
-            // TODO: reset election timer
-            // TODO: persist
+            rf.reset_election_timer();
+            rf.persist();
         }
 
         let voted_for = rf.voted_for;
@@ -163,8 +167,8 @@ impl Raft {
                     && args.last_log_index >= last_log_index))
         {
             rf.voted_for = Some(args.candidate_id);
-            // TODO: reset election timer.
-            // TODO: persist
+            rf.reset_election_timer();
+            rf.persist();
 
             RequestVoteReply {
                 term: args.term,
@@ -196,7 +200,7 @@ impl Raft {
         }
 
         rf.state = State::Follower;
-        // TODO: reset election timer
+        rf.reset_election_timer();
         // TODO: stop previous election
         rf.leader_id = args.leader_id;
 
@@ -236,3 +240,26 @@ impl Raft {
         }
     }
 }
+
+const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
+const ELECTION_TIMEOUT_BASE_MILLIS: u64 = 150;
+const ELECTION_TIMEOUT_VAR_MILLIS: u64 = 250;
+
+impl RaftState {
+    fn reset_election_timer(&mut self) {
+        self.election_timer.as_mut().map(|timer| {
+            timer.reset(tokio::time::Instant::now() + Self::election_timeout())
+        });
+    }
+
+    fn election_timeout() -> Duration {
+        Duration::from_millis(
+            ELECTION_TIMEOUT_BASE_MILLIS
+                + thread_rng().gen_range(0, ELECTION_TIMEOUT_VAR_MILLIS),
+        )
+    }
+
+    fn persist(&self) {
+        // TODO: implement
+    }
+}