Эх сурвалжийг харах

Add an IndexTerm struct for LogArray.

Jing Yang 5 жил өмнө
parent
commit
de65b33c12
3 өөрчлөгдсөн 72 нэмэгдсэн , 31 устгасан
  1. 37 0
      src/index_term.rs
  2. 11 9
      src/lib.rs
  3. 24 22
      src/log_array.rs

+ 37 - 0
src/index_term.rs

@@ -0,0 +1,37 @@
+use crate::{Index, LogEntry, Term};
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub(crate) struct IndexTerm {
+    pub index: Index,
+    pub term: Term,
+}
+
+impl<C> From<&LogEntry<C>> for IndexTerm {
+    fn from(entry: &LogEntry<C>) -> Self {
+        Self {
+            index: entry.index,
+            term: entry.term,
+        }
+    }
+}
+
+impl From<IndexTerm> for (Index, Term) {
+    fn from(index_term: IndexTerm) -> Self {
+        index_term.unpack()
+    }
+}
+
+impl From<(Index, Term)> for IndexTerm {
+    fn from(index_term: (Index, Term)) -> Self {
+        IndexTerm {
+            index: index_term.0,
+            term: index_term.1,
+        }
+    }
+}
+
+impl IndexTerm {
+    pub fn unpack(&self) -> (Index, Term) {
+        (self.index, self.term)
+    }
+}

+ 11 - 9
src/lib.rs

@@ -23,6 +23,7 @@ pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
 use crate::utils::retry_rpc;
 
+mod index_term;
 mod log_array;
 mod persister;
 mod raft_state;
@@ -230,11 +231,11 @@ where
         }
 
         let voted_for = rf.voted_for;
-        let (last_log_index, last_log_term) = rf.log.last_index_term();
+        let last_log = rf.log.last_index_term();
         if (voted_for.is_none() || voted_for == Some(args.candidate_id))
-            && (args.last_log_term > last_log_term
-                || (args.last_log_term == last_log_term
-                    && args.last_log_index >= last_log_index))
+            && (args.last_log_term > last_log.term
+                || (args.last_log_term == last_log.term
+                    && args.last_log_index >= last_log.index))
         {
             rf.voted_for = Some(args.candidate_id);
 
@@ -306,7 +307,7 @@ where
             rf.commit_index = if args.leader_commit < rf.log.end() {
                 args.leader_commit
             } else {
-                rf.log.last_index_term().0
+                rf.log.last_index_term().index
             };
             self.apply_command_signal.notify_one();
         }
@@ -426,7 +427,8 @@ where
             self.persister.save_state(rf.persisted_state().into());
 
             let term = rf.current_term;
-            let (last_log_index, last_log_term) = rf.log.last_index_term();
+            let (last_log_index, last_log_term) =
+                rf.log.last_index_term().unpack();
 
             (
                 term,
@@ -585,12 +587,12 @@ where
             return None;
         }
 
-        let (last_log_index, last_log_term) = rf.log.last_index_term();
+        let last_log = rf.log.last_index_term();
         let args = AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
-            prev_log_index: last_log_index,
-            prev_log_term: last_log_term,
+            prev_log_index: last_log.index,
+            prev_log_term: last_log.term,
             entries: vec![],
             leader_commit: rf.commit_index,
         };

+ 24 - 22
src/log_array.rs

@@ -1,3 +1,4 @@
+use crate::index_term::IndexTerm;
 use crate::{Index, LogEntry, Term};
 
 /// A log array that stores a tail of the whole Raft log.
@@ -63,15 +64,13 @@ impl<C> LogArray<C> {
     }
 
     /// The first index and term stored in this log array.
-    pub fn first_index_term(&self) -> (Index, Term) {
-        let first_entry = self.first_entry();
-        (first_entry.index, first_entry.term)
+    pub fn first_index_term(&self) -> IndexTerm {
+        self.first_entry().into()
     }
 
     /// The last index and term of the Raft log.
-    pub fn last_index_term(&self) -> (Index, Term) {
-        let last_entry = self.last_entry();
-        (last_entry.index, last_entry.term)
+    pub fn last_index_term(&self) -> IndexTerm {
+        self.last_entry().into()
     }
 
     /// The log entry at the given index.
@@ -100,7 +99,7 @@ impl<C> LogArray<C> {
 
     /// The snapshot before and including `start()`.
     #[allow(dead_code)]
-    pub fn snapshot(&self) -> ((Index, Term), &bytes::Bytes) {
+    pub fn snapshot(&self) -> (IndexTerm, &bytes::Bytes) {
         (self.first_index_term(), &self.snapshot)
     }
 }
@@ -168,11 +167,11 @@ impl<C: Default> LogArray<C> {
         // Override the first entry, we know there is at least one entry. This
         // is not strictly needed. One benefit is that the command can be
         // released after this point.
-        let (first_index, first_term) = self.first_index_term();
-        self.inner[0] = Self::build_first_entry(first_index, first_term);
+        let first = self.first_index_term();
+        self.inner[0] = Self::build_first_entry(first.index, first.term);
 
         assert_eq!(
-            first_index, index,
+            first.index, index,
             "Expecting the first entry to have the same index."
         );
 
@@ -306,7 +305,7 @@ mod tests {
         log.check_one_element();
 
         assert_eq!(1, log.end());
-        assert_eq!((0, Term(0)), log.first_index_term());
+        assert_eq!((0, Term(0)), log.first_index_term().into());
         assert_eq!(0, log[0].command);
     }
 
@@ -335,10 +334,13 @@ mod tests {
     #[test]
     fn test_accessors() {
         let (start, end, log) = default_log_array();
-        assert_eq!((start, Term(2)), log.first_index_term());
-        assert_eq!((end - 1, Term(5)), log.last_index_term());
+        assert_eq!((start, Term(2)), log.first_index_term().into());
+        assert_eq!((end - 1, Term(5)), log.last_index_term().into());
         assert_eq!(
-            ((start, Term(2)), &bytes::Bytes::from_static(&[1, 2, 3])),
+            (
+                (start, Term(2)).into(),
+                &bytes::Bytes::from_static(&[1, 2, 3])
+            ),
             log.snapshot()
         );
 
@@ -526,7 +528,7 @@ mod tests {
 
         log.shift(offset, bytes::Bytes::new());
 
-        assert_eq!((offset, Term(3)), log.first_index_term());
+        assert_eq!((offset, Term(3)), log.first_index_term().into());
         assert_eq!(0, log[offset].command);
 
         let all = log.all();
@@ -668,8 +670,8 @@ mod tests {
         assert_eq!(0, log.start());
         assert_eq!(105, log.end());
 
-        assert_eq!((0, Term(0)), log.first_index_term());
-        assert_eq!((104, Term(5)), log.last_index_term());
+        assert_eq!((0, Term(0)), log.first_index_term().into());
+        assert_eq!((104, Term(5)), log.last_index_term().into());
 
         assert_eq!(8, log.at(8).index);
         assert_eq!(5, log[8].term.0);
@@ -680,7 +682,7 @@ mod tests {
         assert_eq!(0, log.start());
         assert_eq!(50, log.end());
 
-        assert_eq!((49, Term(5)), log.last_index_term());
+        assert_eq!((49, Term(5)), log.last_index_term().into());
         assert_eq!(49, log.at(49).index);
         assert_eq!(44, log[49].command);
         assert_eq!(5, log.at(5).term.0);
@@ -689,7 +691,7 @@ mod tests {
 
         // Snapshot is not changed.
         assert_eq!(
-            ((0, Term(0)), &bytes::Bytes::from_static(&[1, 2, 3])),
+            ((0, Term(0)).into(), &bytes::Bytes::from_static(&[1, 2, 3])),
             log.snapshot()
         );
 
@@ -697,14 +699,14 @@ mod tests {
         // Start changed, end did not;
         assert_eq!(5, log.start());
 
-        assert_eq!((5, Term(5)), log.first_index_term());
+        assert_eq!((5, Term(5)), log.first_index_term().into());
         assert_eq!(5, log.at(5).index);
         assert_eq!(5, log.at(5).term.0);
         // Cannot assert 4 is out of range. log is mut and cannot be used in
         // catch_unwind().
 
         // Snapshot is changed, too.
-        assert_eq!(((5, Term(5)), &bytes::Bytes::new()), log.snapshot());
+        assert_eq!(((5, Term(5)).into(), &bytes::Bytes::new()), log.snapshot());
 
         // Ranged accessors.
         assert_eq!(45, log.all().len());
@@ -717,7 +719,7 @@ mod tests {
         assert_eq!(1, log.all().len());
         assert_eq!(log.first_index_term(), log.last_index_term());
         assert_eq!(
-            ((9, Term(7)), &bytes::Bytes::from_static(&[7, 8, 9])),
+            ((9, Term(7)).into(), &bytes::Bytes::from_static(&[7, 8, 9])),
             log.snapshot()
         );
     }