浏览代码

Move process_append_entries to its own file.

Jing Yang 4 年之前
父节点
当前提交
1407ea6e02
共有 2 个文件被更改,包括 89 次插入75 次删除
  1. 1 75
      src/lib.rs
  2. 88 0
      src/process_append_entries.rs

+ 1 - 75
src/lib.rs

@@ -36,6 +36,7 @@ mod index_term;
 mod install_snapshot;
 mod log_array;
 mod persister;
+mod process_append_entries;
 mod raft_state;
 pub mod rpcs;
 mod snapshot;
@@ -257,81 +258,6 @@ where
             }
         }
     }
-
-    pub(crate) fn process_append_entries(
-        &self,
-        args: AppendEntriesArgs<Command>,
-    ) -> AppendEntriesReply {
-        // Note: do not change this to `let _ = ...`.
-        let _guard = self.daemon_env.for_scope();
-
-        let mut rf = self.inner_state.lock();
-        if rf.current_term > args.term {
-            return AppendEntriesReply {
-                term: rf.current_term,
-                success: false,
-                committed: Some(rf.log.first_after(rf.commit_index).into()),
-            };
-        }
-
-        if rf.current_term < args.term {
-            rf.current_term = args.term;
-            rf.voted_for = None;
-            self.persister.save_state(rf.persisted_state().into());
-        }
-
-        rf.state = State::Follower;
-        rf.leader_id = args.leader_id;
-
-        self.election.reset_election_timer();
-
-        if rf.log.start() > args.prev_log_index
-            || rf.log.end() <= args.prev_log_index
-            || rf.log[args.prev_log_index].term != args.prev_log_term
-        {
-            return AppendEntriesReply {
-                term: args.term,
-                success: args.prev_log_index < rf.log.start(),
-                committed: Some(rf.log.first_after(rf.commit_index).into()),
-            };
-        }
-
-        for (i, entry) in args.entries.iter().enumerate() {
-            let index = i + args.prev_log_index + 1;
-            if rf.log.end() > index {
-                if rf.log[index].term != entry.term {
-                    check_or_record!(
-                        index > rf.commit_index,
-                        ErrorKind::RollbackCommitted(index),
-                        "Entries before commit index should never be rolled back",
-                        &rf
-                    );
-                    rf.log.truncate(index);
-                    rf.log.push(entry.clone());
-                }
-            } else {
-                rf.log.push(entry.clone());
-            }
-        }
-
-        self.persister.save_state(rf.persisted_state().into());
-
-        if args.leader_commit > rf.commit_index {
-            rf.commit_index = if args.leader_commit < rf.log.end() {
-                args.leader_commit
-            } else {
-                rf.log.last_index_term().index
-            };
-            self.apply_command_signal.notify_one();
-        }
-        self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
-
-        AppendEntriesReply {
-            term: args.term,
-            success: true,
-            committed: None,
-        }
-    }
 }
 
 // Command must be

+ 88 - 0
src/process_append_entries.rs

@@ -0,0 +1,88 @@
+use crate::daemon_env::ErrorKind;
+use crate::{
+    check_or_record, AppendEntriesArgs, AppendEntriesReply, Raft, State,
+};
+
+// Command must be
+// 1. clone: they are copied to the persister.
+// 2. serialize: they are converted to bytes to persist.
+// 3. default: a default value is used as the first element of the log.
+impl<Command> Raft<Command>
+where
+    Command: Clone + serde::Serialize + Default,
+{
+    pub(crate) fn process_append_entries(
+        &self,
+        args: AppendEntriesArgs<Command>,
+    ) -> AppendEntriesReply {
+        // Note: do not change this to `let _ = ...`.
+        let _guard = self.daemon_env.for_scope();
+
+        let mut rf = self.inner_state.lock();
+        if rf.current_term > args.term {
+            return AppendEntriesReply {
+                term: rf.current_term,
+                success: false,
+                committed: Some(rf.log.first_after(rf.commit_index).into()),
+            };
+        }
+
+        if rf.current_term < args.term {
+            rf.current_term = args.term;
+            rf.voted_for = None;
+            self.persister.save_state(rf.persisted_state().into());
+        }
+
+        rf.state = State::Follower;
+        rf.leader_id = args.leader_id;
+
+        self.election.reset_election_timer();
+
+        if rf.log.start() > args.prev_log_index
+            || rf.log.end() <= args.prev_log_index
+            || rf.log[args.prev_log_index].term != args.prev_log_term
+        {
+            return AppendEntriesReply {
+                term: args.term,
+                success: args.prev_log_index < rf.log.start(),
+                committed: Some(rf.log.first_after(rf.commit_index).into()),
+            };
+        }
+
+        for (i, entry) in args.entries.iter().enumerate() {
+            let index = i + args.prev_log_index + 1;
+            if rf.log.end() > index {
+                if rf.log[index].term != entry.term {
+                    check_or_record!(
+                        index > rf.commit_index,
+                        ErrorKind::RollbackCommitted(index),
+                        "Entries before commit index should never be rolled back",
+                        &rf
+                    );
+                    rf.log.truncate(index);
+                    rf.log.push(entry.clone());
+                }
+            } else {
+                rf.log.push(entry.clone());
+            }
+        }
+
+        self.persister.save_state(rf.persisted_state().into());
+
+        if args.leader_commit > rf.commit_index {
+            rf.commit_index = if args.leader_commit < rf.log.end() {
+                args.leader_commit
+            } else {
+                rf.log.last_index_term().index
+            };
+            self.apply_command_signal.notify_one();
+        }
+        self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
+
+        AppendEntriesReply {
+            term: args.term,
+            success: true,
+            committed: None,
+        }
+    }
+}