瀏覽代碼

Explicitly allow no-op and term-change log entries.

The message sent to the application now can contain no command at all.
Jing Yang 3 年之前
父節點
當前提交
098742095d
共有 6 個文件被更改,包括 57 次插入35 次删除
  1. 7 5
      kvraft/src/server.rs
  2. 10 3
      src/apply_command.rs
  3. 3 2
      src/election.rs
  4. 8 1
      src/lib.rs
  5. 28 23
      src/log_array.rs
  6. 1 1
      test_configs/src/raft/config.rs

+ 7 - 5
kvraft/src/server.rs

@@ -252,11 +252,13 @@ impl KVServer {
                             this.restore_state(state);
                         }
                         ApplyCommandMessage::Command(index, command) => {
-                            this.apply_op(
-                                command.unique_id,
-                                command.me,
-                                command.op,
-                            );
+                            if let Some(command) = command {
+                                this.apply_op(
+                                    command.unique_id,
+                                    command.me,
+                                    command.op,
+                                );
+                            }
                             this.process_read_requests(index);
                             if let Some(snapshot) = snapshot_holder
                                 .take_snapshot(&this.state.lock(), index)

+ 10 - 3
src/apply_command.rs

@@ -2,11 +2,11 @@ use std::sync::atomic::Ordering;
 use std::time::Duration;
 
 use crate::daemon_env::Daemon;
-use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
+use crate::{Index, LogEntryEnum, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
 
 pub enum ApplyCommandMessage<Command> {
     Snapshot(Snapshot),
-    Command(Index, Command),
+    Command(Index, Option<Command>),
 }
 
 pub trait ApplyCommandFnMut<Command>:
@@ -99,9 +99,16 @@ where
                             .between(index, last_one)
                             .iter()
                             .map(|entry| {
+                                let command = match &entry.command {
+                                    LogEntryEnum::Command(command) => {
+                                        Some(command.clone())
+                                    }
+                                    LogEntryEnum::TermChange => None,
+                                    LogEntryEnum::Noop => None,
+                                };
                                 ApplyCommandMessage::Command(
                                     entry.index,
-                                    entry.command.clone(),
+                                    command,
                                 )
                             })
                             .collect();

+ 3 - 2
src/election.rs

@@ -10,7 +10,8 @@ use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    Peer, Persister, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term,
+    LogEntryEnum, Peer, Persister, Raft, RaftState, RemoteRaft,
+    RequestVoteArgs, State, Term,
 };
 
 #[derive(Default)]
@@ -380,7 +381,7 @@ where
 
             if rf.commit_index != rf.log.last_index_term().index {
                 rf.sentinel_commit_index =
-                    rf.log.add_command(term, Default::default());
+                    rf.log.add_entry(term, LogEntryEnum::TermChange);
                 persister.save_state(rf.persisted_state().into());
             } else {
                 rf.sentinel_commit_index = rf.commit_index;

+ 8 - 1
src/lib.rs

@@ -51,11 +51,18 @@ struct Peer(usize);
 
 pub type Index = usize;
 
+#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
+enum LogEntryEnum<Command> {
+    TermChange,
+    Noop,
+    Command(Command),
+}
+
 #[derive(Clone, Debug, Serialize, Deserialize)]
 struct LogEntry<Command> {
     index: Index,
     term: Term,
-    command: Command,
+    command: LogEntryEnum<Command>,
 }
 
 #[derive(Clone)]

+ 28 - 23
src/log_array.rs

@@ -1,7 +1,7 @@
 use serde_derive::{Deserialize, Serialize};
 
 use crate::index_term::IndexTerm;
-use crate::{Index, LogEntry, Term};
+use crate::{Index, LogEntry, LogEntryEnum, Term};
 
 /// A log array that stores a tail of the whole Raft log.
 ///
@@ -149,18 +149,23 @@ impl<C> LogArray<C> {
 
 // Mutations
 impl<C> LogArray<C> {
-    /// Add a new entry to the Raft log, with term and command. The new index is
-    /// returned.
-    pub fn add_command(&mut self, term: Term, command: C) -> Index {
+    /// Add a new entry to the Raft log. The new index is returned.
+    pub fn add_entry(&mut self, term: Term, entry: LogEntryEnum<C>) -> Index {
         let index = self.end();
         self.push(LogEntry {
             index,
             term,
-            command,
+            command: entry,
         });
         index
     }
 
+    /// Add a new entry to the Raft log, with term and command. The new index is
+    /// returned.
+    pub fn add_command(&mut self, term: Term, command: C) -> Index {
+        self.add_entry(term, LogEntryEnum::Command(command))
+    }
+
     /// Push a LogEntry into the Raft log. The index of the log entry must match
     /// the next index in the log.
     pub fn push(&mut self, log_entry: LogEntry<C>) {
@@ -292,12 +297,12 @@ impl<C> LogArray<C> {
     }
 }
 
-impl<C: Default> LogArray<C> {
+impl<C> LogArray<C> {
     fn build_first_entry(index: Index, term: Term) -> LogEntry<C> {
         LogEntry {
             index,
             term,
-            command: C::default(),
+            command: LogEntryEnum::Noop,
         }
     }
 }
@@ -326,7 +331,7 @@ mod tests {
             ret.push(LogEntry {
                 term: Term(i / 3),
                 index: i,
-                command: (end - i) as i32,
+                command: LogEntryEnum::Command((end - i) as i32),
             })
         }
 
@@ -347,7 +352,7 @@ mod tests {
 
         assert_eq!(1, log.end());
         assert_eq!((0, Term(0)), log.first_index_term().into());
-        assert_eq!(0, log[0].command);
+        assert_eq!(LogEntryEnum::Noop, log[0].command);
     }
 
     #[test]
@@ -392,19 +397,19 @@ mod tests {
         let last = log.at(end - 1);
         assert_eq!(end - 1, last.index);
         assert_eq!(5, last.term.0);
-        assert_eq!(1, last.command);
+        assert_eq!(LogEntryEnum::Command(1), last.command);
 
         let first = log.at(start);
         assert_eq!(start, first.index);
         assert_eq!(2, first.term.0);
-        assert_eq!(9, first.command);
+        assert_eq!(LogEntryEnum::Command(9), first.command);
 
         assert!(start < 12);
         assert!(end > 12);
         let middle = log.at(12);
         assert_eq!(12, middle.index);
         assert_eq!(4, middle.term.0);
-        assert_eq!(5, middle.command);
+        assert_eq!(LogEntryEnum::Command(5), middle.command);
 
         let at_before_start = catch_unwind(|| {
             log.at(start - 1);
@@ -480,7 +485,7 @@ mod tests {
         let (_, end, mut log) = default_log_array();
         let index = log.add_command(Term(8), 9);
         assert_eq!(8, log.at(index).term.0);
-        assert_eq!(9, log.at(index).command);
+        assert_eq!(LogEntryEnum::Command(9), log.at(index).command);
         assert_eq!(index, end);
         assert_eq!(index + 1, log.end());
     }
@@ -491,10 +496,10 @@ mod tests {
         log.push(LogEntry {
             term: Term(8),
             index: end,
-            command: 1,
+            command: LogEntryEnum::Command(1),
         });
         assert_eq!(8, log.at(end).term.0);
-        assert_eq!(1, log.at(end).command);
+        assert_eq!(LogEntryEnum::Command(1), log.at(end).command);
         assert_eq!(end + 1, log.end());
     }
 
@@ -505,7 +510,7 @@ mod tests {
         log.push(LogEntry {
             term: Term(8),
             index: end - 1,
-            command: 1,
+            command: LogEntryEnum::Command(1),
         });
     }
 
@@ -516,7 +521,7 @@ mod tests {
         log.push(LogEntry {
             term: Term(8),
             index: end + 1,
-            command: 1,
+            command: LogEntryEnum::Command(1),
         });
     }
 
@@ -572,7 +577,7 @@ mod tests {
         log.shift(offset, vec![]);
 
         assert_eq!((offset, Term(3)), log.first_index_term().into());
-        assert_eq!(0, log[offset].command);
+        assert_eq!(LogEntryEnum::Noop, log[offset].command);
 
         let all = log.all();
         assert_eq!(end - offset, all.len());
@@ -624,7 +629,7 @@ mod tests {
         assert_eq!(vec![1, 2], log.snapshot);
         assert_eq!(88, log[88].index);
         assert_eq!(99, log[88].term.0);
-        assert_eq!(0, log[88].command);
+        assert_eq!(LogEntryEnum::Noop, log[88].command);
 
         assert_eq!(end - start, dump.len());
     }
@@ -698,13 +703,13 @@ mod tests {
         log.push(LogEntry {
             term: Term(3),
             index: 2,
-            command: 3,
+            command: LogEntryEnum::Command(3),
         });
         log.add_command(Term(4), 20);
         log.push(LogEntry {
             term: Term(4),
             index: 4,
-            command: 7,
+            command: LogEntryEnum::Command(7),
         });
 
         for i in 0..100 {
@@ -718,7 +723,7 @@ mod tests {
 
         assert_eq!(8, log.at(8).index);
         assert_eq!(5, log[8].term.0);
-        assert_eq!(7, log[4].command);
+        assert_eq!(LogEntryEnum::Command(7), log[4].command);
 
         log.truncate(50);
         // End changed, start does not.
@@ -727,7 +732,7 @@ mod tests {
 
         assert_eq!((49, Term(5)), log.last_index_term().into());
         assert_eq!(49, log.at(49).index);
-        assert_eq!(44, log[49].command);
+        assert_eq!(LogEntryEnum::Command(44), log[49].command);
         assert_eq!(5, log.at(5).term.0);
         // Cannot assert 50 is out of range. log is mut and cannot be used in
         // catch_unwind().

+ 1 - 1
test_configs/src/raft/config.rs

@@ -393,7 +393,7 @@ impl Config {
     ) {
         let (index, command) =
             if let ApplyCommandMessage::Command(index, command) = message {
-                (index, command)
+                (index, command.unwrap_or(0))
             } else {
                 // Ignore snapshots.
                 return;