Browse Source

Implement request vote RPC serving.

Jing Yang 5 years ago
parent
commit
6a1b7b98bf
1 changed files with 56 additions and 12 deletions
  1. 56 12
      src/lib.rs

+ 56 - 12
src/lib.rs

@@ -26,12 +26,18 @@ impl Default for State {
     }
 }
 
+struct LogEntry {
+    term: usize,
+    index: usize,
+    // TODO: Allow sending of arbitrary information.
+    command: usize,
+}
+
 #[derive(Default)]
 struct RaftState {
     current_term: usize,
-    voted_for: i64,
-    // TODO: Allow sending of arbitrary information.
-    log: Vec<usize>,
+    voted_for: Option<usize>,
+    log: Vec<LogEntry>,
 
     commit_index: usize,
     last_applied: usize,
@@ -59,39 +65,77 @@ struct Raft {
     // new_log_entry: Sender<usize>,
     // new_log_entry: Receiver<usize>,
     // apply_command_cond: Condvar
-
     keep_running: AtomicBool,
-
     // applyCh: Sender<ApplyMsg>
 }
 
 #[derive(Serialize, Deserialize)]
 struct RequestVoteArgs {
-    term: i64,
+    term: usize,
+    candidate_id: usize,
+    last_log_index: usize,
+    last_log_term: usize,
 }
 
 #[derive(Serialize, Deserialize)]
 struct RequestVoteReply {
-    term: i64,
+    term: usize,
+    vote_granted: bool,
 }
 
 #[derive(Serialize, Deserialize)]
 struct AppendEntriesArgs {
-    term: i64,
+    term: usize,
 }
 
 #[derive(Serialize, Deserialize)]
 struct AppendEntriesReply {
-    term: i64,
+    term: usize,
 }
 
 impl Raft {
     pub(crate) fn process_request_vote(
         &self,
-        request: RequestVoteArgs,
+        args: RequestVoteArgs,
     ) -> RequestVoteReply {
-        RequestVoteReply {
-            term: request.term + 1,
+        let mut rf = self.inner_state.lock();
+
+        let term = rf.current_term;
+        if args.term < term {
+            return RequestVoteReply {
+                term,
+                vote_granted: false,
+            };
+        } else if args.term > term {
+            rf.current_term = args.term;
+            rf.voted_for = None;
+            rf.state = State::Follower;
+            // TODO: quit current election
+            // TODO: reset election timer
+            // TODO: persist
+        }
+
+        let voted_for = rf.voted_for;
+        let last_log_index = rf.log.len() - 1;
+        let last_log_term = rf.log.last().unwrap().term;
+        if (voted_for.is_none() || voted_for == Some(args.candidate_id))
+            && (args.last_log_term > last_log_term
+                || (args.last_log_term == last_log_term
+                    && args.last_log_index >= last_log_index))
+        {
+            rf.voted_for = Some(args.candidate_id);
+            // TODO: reset election timer.
+            // TODO: persist
+
+            RequestVoteReply {
+                term: args.term,
+                vote_granted: true,
+            }
+        } else {
+            RequestVoteReply {
+                term: args.term,
+                vote_granted: false,
+            }
         }
     }