Przeglądaj źródła

Move LogEntry and LogEntryEnum to log_array.rs

LogEntryEnum is now a private enum of log_array.rs.
Jing Yang 3 lat temu
rodzic
commit
e8b752a92f
4 zmienionych plików z 74 dodań i 32 usunięć
  1. 2 9
      src/apply_command.rs
  2. 2 4
      src/election.rs
  3. 2 16
      src/lib.rs
  4. 68 3
      src/log_array.rs

+ 2 - 9
src/apply_command.rs

@@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
 use std::time::Duration;
 
 use crate::daemon_env::Daemon;
-use crate::{Index, LogEntryEnum, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
+use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
 
 pub enum ApplyCommandMessage<Command> {
     Snapshot(Snapshot),
@@ -99,16 +99,9 @@ where
                             .between(index, last_one)
                             .iter()
                             .map(|entry| {
-                                let command = match &entry.command {
-                                    LogEntryEnum::Command(command) => {
-                                        Some(command.clone())
-                                    }
-                                    LogEntryEnum::TermChange => None,
-                                    LogEntryEnum::Noop => None,
-                                };
                                 ApplyCommandMessage::Command(
                                     entry.index,
-                                    command,
+                                    entry.command().cloned(),
                                 )
                             })
                             .collect();

+ 2 - 4
src/election.rs

@@ -10,8 +10,7 @@ use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    LogEntryEnum, Peer, Persister, Raft, RaftState, RemoteRaft,
-    RequestVoteArgs, State, Term,
+    Peer, Persister, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term,
 };
 
 #[derive(Default)]
@@ -380,8 +379,7 @@ where
             verify_authority_daemon.reset_state(term);
 
             if rf.commit_index != rf.log.last_index_term().index {
-                rf.sentinel_commit_index =
-                    rf.log.add_entry(term, LogEntryEnum::TermChange);
+                rf.sentinel_commit_index = rf.log.add_term_change_entry(term);
                 persister.save_state(rf.persisted_state().into());
             } else {
                 rf.sentinel_commit_index = rf.commit_index;

+ 2 - 16
src/lib.rs

@@ -13,6 +13,8 @@ use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::election::ElectionState;
 use crate::heartbeats::HeartbeatsDaemon;
 use crate::index_term::IndexTerm;
+pub use crate::log_array::Index;
+pub(crate) use crate::log_array::LogEntry;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
@@ -49,22 +51,6 @@ pub struct Term(pub usize);
 #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
 struct Peer(usize);
 
-pub type Index = usize;
-
-#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
-enum LogEntryEnum<Command> {
-    TermChange,
-    Noop,
-    Command(Command),
-}
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-struct LogEntry<Command> {
-    index: Index,
-    term: Term,
-    command: LogEntryEnum<Command>,
-}
-
 #[derive(Clone)]
 pub struct Raft<Command> {
     inner_state: Arc<Mutex<RaftState<Command>>>,

+ 68 - 3
src/log_array.rs

@@ -1,7 +1,7 @@
 use serde_derive::{Deserialize, Serialize};
 
 use crate::index_term::IndexTerm;
-use crate::{Index, LogEntry, LogEntryEnum, Term};
+use crate::Term;
 
 /// A log array that stores a tail of the whole Raft log.
 ///
@@ -26,7 +26,23 @@ use crate::{Index, LogEntry, LogEntryEnum, Term};
 /// index, a term and a snapshot.
 ///
 /// All APIs **will** panic if the given index(es) are out of bound.
-///
+
+pub type Index = usize;
+
+#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
+enum LogEntryEnum<Command> {
+    TermChange,
+    Noop,
+    Command(Command),
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct LogEntry<Command> {
+    pub index: Index,
+    pub term: Term,
+    command: LogEntryEnum<Command>,
+}
+
 /// NOT THREAD SAFE.
 #[derive(Clone, Serialize, Deserialize)]
 pub(crate) struct LogArray<C> {
@@ -53,6 +69,16 @@ impl<C: Default> LogArray<C> {
     }
 }
 
+impl<C> LogEntry<C> {
+    pub fn command(&self) -> Option<&C> {
+        match &self.command {
+            LogEntryEnum::TermChange => None,
+            LogEntryEnum::Noop => None,
+            LogEntryEnum::Command(command) => Some(command),
+        }
+    }
+}
+
 // Log accessors
 impl<C> LogArray<C> {
     /// The start of the stored log entries. The command at this index is no
@@ -150,7 +176,7 @@ impl<C> LogArray<C> {
 // Mutations
 impl<C> LogArray<C> {
     /// Add a new entry to the Raft log. The new index is returned.
-    pub fn add_entry(&mut self, term: Term, entry: LogEntryEnum<C>) -> Index {
+    fn add_entry(&mut self, term: Term, entry: LogEntryEnum<C>) -> Index {
         let index = self.end();
         self.push(LogEntry {
             index,
@@ -166,6 +192,11 @@ impl<C> LogArray<C> {
         self.add_entry(term, LogEntryEnum::Command(command))
     }
 
+    /// Add a new term change entry to the Raft log. The new index is returned.
+    pub fn add_term_change_entry(&mut self, term: Term) -> Index {
+        self.add_entry(term, LogEntryEnum::TermChange)
+    }
+
     /// Push a LogEntry into the Raft log. The index of the log entry must match
     /// the next index in the log.
     pub fn push(&mut self, log_entry: LogEntry<C>) {
@@ -490,6 +521,16 @@ mod tests {
         assert_eq!(index + 1, log.end());
     }
 
+    #[test]
+    fn test_add_term_change() {
+        let (_, end, mut log) = default_log_array();
+        let index = log.add_term_change_entry(Term(8));
+        assert_eq!(8, log.at(index).term.0);
+        assert_eq!(LogEntryEnum::TermChange, log.at(index).command);
+        assert_eq!(index, end);
+        assert_eq!(index + 1, log.end());
+    }
+
     #[test]
     fn test_push() {
         let (_, end, mut log) = default_log_array();
@@ -824,4 +865,28 @@ mod tests {
             .expect_err("Validation should have failed");
         assert!(matches!(err, ValidationError::IndexMismatch(8, _)));
     }
+
+    #[test]
+    fn test_log_entry_command() {
+        let entry = LogEntry::<i32> {
+            index: 0,
+            term: Term(0),
+            command: LogEntryEnum::TermChange,
+        };
+        assert_eq!(None, entry.command());
+
+        let entry = LogEntry::<i32> {
+            index: 0,
+            term: Term(0),
+            command: LogEntryEnum::Noop,
+        };
+        assert_eq!(None, entry.command());
+
+        let entry = LogEntry::<i32> {
+            index: 0,
+            term: Term(0),
+            command: LogEntryEnum::Command(1),
+        };
+        assert_eq!(Some(1), entry.command().cloned());
+    }
 }