Selaa lähdekoodia

Implement append entries RPC serving.

Jing Yang 5 vuotta sitten
vanhempi
commit
d707b70546
2 muutettua tiedostoa jossa 68 lisäystä ja 4 poistoa
  1. 58 2
      src/lib.rs
  2. 10 2
      src/rpcs.rs

+ 58 - 2
src/lib.rs

@@ -46,6 +46,7 @@ impl Default for Peer {
     }
 }
 
+#[derive(Clone, Copy, Serialize, Deserialize)]
 struct LogEntry {
     term: Term,
     index: usize,
@@ -106,11 +107,17 @@ struct RequestVoteReply {
 #[derive(Serialize, Deserialize)]
 struct AppendEntriesArgs {
     term: Term,
+    leader_id: Peer,
+    prev_log_index: usize,
+    prev_log_term: Term,
+    entries: Vec<LogEntry>,
+    leader_commit: usize,
 }
 
 #[derive(Serialize, Deserialize)]
 struct AppendEntriesReply {
     term: Term,
+    success: bool,
 }
 
 impl Raft {
@@ -173,10 +180,59 @@ impl Raft {
 
     pub(crate) fn process_append_entries(
         &self,
-        request: AppendEntriesArgs,
+        args: AppendEntriesArgs,
     ) -> AppendEntriesReply {
+        let mut rf = self.inner_state.lock();
+        if rf.current_term > args.term {
+            return AppendEntriesReply {
+                term: rf.current_term,
+                success: false,
+            };
+        }
+
+        if rf.current_term < args.term {
+            rf.current_term = args.term;
+            rf.voted_for = None;
+        }
+
+        rf.state = State::Follower;
+        // TODO: reset election timer
+        // TODO: stop previous election
+        rf.leader_id = args.leader_id;
+
+        if rf.log.len() <= args.prev_log_index
+            || rf.log[args.prev_log_index].term != args.term
+        {
+            return AppendEntriesReply {
+                term: args.term,
+                success: false,
+            };
+        }
+
+        for (i, entry) in args.entries.iter().enumerate() {
+            let index = i + args.prev_log_index + 1;
+            if rf.log.len() > index {
+                if rf.log[index].term != entry.term {
+                    rf.log.truncate(index);
+                    rf.log.push(entry.clone());
+                }
+            } else {
+                rf.log.push(entry.clone());
+            }
+        }
+
+        if args.leader_commit > rf.commit_index {
+            rf.commit_index = if args.leader_commit < rf.log.len() {
+                args.leader_commit
+            } else {
+                rf.log.len() - 1
+            };
+            // TODO: apply commands.
+        }
+
         AppendEntriesReply {
-            term: Term(request.term.0 - 1),
+            term: args.term,
+            success: true,
         }
     }
 }

+ 10 - 2
src/rpcs.rs

@@ -145,11 +145,19 @@ mod tests {
         )?;
         assert_eq!(true, response.vote_granted);
 
-        let request = AppendEntriesArgs { term: Term(2021) };
+        let request = AppendEntriesArgs {
+            term: Term(2021),
+            leader_id: Default::default(),
+            prev_log_index: 0,
+            prev_log_term: Default::default(),
+            entries: vec![],
+            leader_commit: 0,
+        };
         let response = futures::executor::block_on(
             rpc_client.call_append_entries(0, request),
         )?;
-        assert_eq!(2020, response.term.0);
+        assert_eq!(2021, response.term.0);
+        assert_eq!(false, response.success);
 
         Ok(())
     }