Преглед на файлове

Use `IndexTerm` in public API.

Jing Yang преди 3 години
родител
ревизия
baf22fec60
променени са 10 файла, в които са добавени 26 реда и са изтрити 27 реда
  1. 5 5
      kvraft/src/server.rs
  2. 1 2
      src/daemon_env.rs
  3. 6 5
      src/index_term.rs
  4. 1 0
      src/lib.rs
  5. 1 2
      src/log_array.rs
  6. 1 2
      src/messages.rs
  7. 2 2
      src/process_append_entries.rs
  8. 3 3
      src/raft.rs
  9. 2 3
      src/sync_log_entries.rs
  10. 4 3
      test_configs/src/raft/config.rs

+ 5 - 5
kvraft/src/server.rs

@@ -377,11 +377,11 @@ impl KVServer {
                 unique_id,
             };
             let start = log_with!(self.logger, self.rf.start(op));
-            let start_term =
-                start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
-                    Self::validate_term(term);
-                    term
-                });
+            let start_term = start.map_or(Self::UNSEEN_TERM, |index_term| {
+                let Term(term) = index_term.term;
+                Self::validate_term(term);
+                term
+            });
             let set = result_holder.term.compare_exchange(
                 Self::ATTEMPTING_TERM,
                 start_term,

+ 1 - 2
src/daemon_env.rs

@@ -6,8 +6,7 @@ use parking_lot::Mutex;
 #[cfg(all(not(test), feature = "integration-test"))]
 use test_utils::thread_local_logger::{self, LocalLogger};
 
-use crate::index_term::IndexTerm;
-use crate::{Peer, RaftState, State, Term};
+use crate::{IndexTerm, Peer, RaftState, State, Term};
 
 /// A convenient macro to record errors.
 #[macro_export]

+ 6 - 5
src/index_term.rs

@@ -4,7 +4,7 @@ use crate::log_array::LogEntry;
 use crate::{Index, Term};
 
 #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
-pub(crate) struct IndexTerm {
+pub struct IndexTerm {
     pub index: Index,
     pub term: Term,
 }
@@ -26,10 +26,7 @@ impl From<IndexTerm> for (Index, Term) {
 
 impl From<(Index, Term)> for IndexTerm {
     fn from(index_term: (Index, Term)) -> Self {
-        IndexTerm {
-            index: index_term.0,
-            term: index_term.1,
-        }
+        Self::pack(index_term.0, index_term.1)
     }
 }
 
@@ -37,4 +34,8 @@ impl IndexTerm {
     pub fn unpack(&self) -> (Index, Term) {
         (self.index, self.term)
     }
+
+    pub fn pack(index: Index, term: Term) -> Self {
+        Self { index, term }
+    }
 }

+ 1 - 0
src/lib.rs

@@ -1,4 +1,5 @@
 pub use crate::apply_command::ApplyCommandMessage;
+pub use crate::index_term::IndexTerm;
 pub use crate::log_array::Index;
 pub use crate::messages::*;
 pub use crate::persister::Persister;

+ 1 - 2
src/log_array.rs

@@ -1,7 +1,6 @@
 use serde_derive::{Deserialize, Serialize};
 
-use crate::index_term::IndexTerm;
-use crate::Term;
+use crate::{IndexTerm, Term};
 
 /// A log array that stores a tail of the whole Raft log.
 ///

+ 1 - 2
src/messages.rs

@@ -1,9 +1,8 @@
 use serde_derive::{Deserialize, Serialize};
 
-use crate::index_term::IndexTerm;
 use crate::log_array::LogEntry;
 use crate::raft::{Peer, Term};
-use crate::Index;
+use crate::{Index, IndexTerm};
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct RequestVoteArgs {

+ 2 - 2
src/process_append_entries.rs

@@ -1,7 +1,7 @@
 use crate::daemon_env::ErrorKind;
-use crate::index_term::IndexTerm;
 use crate::{
-    check_or_record, AppendEntriesArgs, AppendEntriesReply, Raft, State,
+    check_or_record, AppendEntriesArgs, AppendEntriesReply, IndexTerm, Raft,
+    State,
 };
 
 // Command must be

+ 3 - 3
src/raft.rs

@@ -13,7 +13,7 @@ use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL_MILLIS};
 use crate::persister::PersistedRaftState;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::verify_authority::VerifyAuthorityDaemon;
-use crate::{utils, Index, Persister, RaftState, RemoteRaft};
+use crate::{utils, IndexTerm, Persister, RaftState, RemoteRaft};
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -165,7 +165,7 @@ where
     /// Returns `None` if we are not the leader. The log entry may not have been
     /// committed to the log when this method returns. When and if it is
     /// committed, the `apply_command` callback will be called.
-    pub fn start(&self, command: Command) -> Option<(Term, Index)> {
+    pub fn start(&self, command: Command) -> Option<IndexTerm> {
         let mut rf = self.inner_state.lock();
         let term = rf.current_term;
         if !rf.is_leader() {
@@ -179,7 +179,7 @@ where
         let _ = self.new_log_entry.as_ref().unwrap().send(None);
 
         log::info!("{:?} started new entry at {} {:?}", self.me, index, term);
-        Some((term, index))
+        Some(IndexTerm::pack(index, term))
     }
 
     /// Cleanly shutdown this instance. This function never blocks forever. It

+ 2 - 3
src/sync_log_entries.rs

@@ -7,13 +7,12 @@ use parking_lot::{Condvar, Mutex};
 use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::heartbeats::HEARTBEAT_INTERVAL_MILLIS;
-use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
 use crate::{
-    AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
-    Term,
+    AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer, Raft, RaftState,
+    RemoteRaft, Term,
 };
 
 #[repr(align(64))]

+ 4 - 3
test_configs/src/raft/config.rs

@@ -208,8 +208,8 @@ impl Config {
                 let state = self.state.lock();
                 if state.connected[cnt] {
                     if let Some(raft) = &state.rafts[cnt] {
-                        if let Some((_, index)) = raft.start(cmd) {
-                            first_index.replace(index);
+                        if let Some(index_term) = raft.start(cmd) {
+                            first_index.replace(index_term.index);
                         }
                     }
                 }
@@ -336,8 +336,9 @@ impl Config {
     ) -> Option<(usize, usize)> {
         self.state.lock().rafts[leader]
             .as_ref()
-            .map(|raft| raft.start(cmd).map(|(term, index)| (term.0, index)))
             .unwrap()
+            .start(cmd)
+            .map(|index_term| (index_term.term.0, index_term.index))
     }
 
     pub fn is_connected(&self, index: usize) -> bool {