Quellcode durchsuchen

Explicitly list the requirement for commands.

Jing Yang vor 3 Jahren
Ursprung
Commit
7d8b309287
7 geänderte Dateien mit 37 neuen und 38 gelöschten Zeilen
  1. 3 5
      src/apply_command.rs
  2. 3 5
      src/election.rs
  3. 4 5
      src/heartbeats.rs
  4. 2 0
      src/lib.rs
  5. 5 16
      src/raft.rs
  6. 17 0
      src/replicable_command.rs
  7. 3 7
      src/sync_log_entries.rs

+ 3 - 5
src/apply_command.rs

@@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
 
 use crate::daemon_env::Daemon;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
-use crate::{Index, Raft, Snapshot};
+use crate::{Index, Raft, ReplicableCommand, Snapshot};
 
 pub enum ApplyCommandMessage<Command> {
     Snapshot(Snapshot),
@@ -19,10 +19,8 @@ impl<Command, T: 'static + Send + FnMut(ApplyCommandMessage<Command>)>
 {
 }
 
-impl<Command> Raft<Command>
-where
-    Command: 'static + Clone + Send,
-{
+// Command: 'static + Clone + Send,
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Runs a daemon thread that sends committed log entries to the
     /// application via a callback `apply_command`.
     ///

+ 3 - 5
src/election.rs

@@ -10,7 +10,8 @@ use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    Peer, Persister, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term,
+    Peer, Persister, Raft, RaftState, RemoteRaft, ReplicableCommand,
+    RequestVoteArgs, State, Term,
 };
 
 pub(crate) struct ElectionState {
@@ -69,10 +70,7 @@ impl ElectionState {
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 3. serialize: they are converted to bytes to persist.
-impl<Command> Raft<Command>
-where
-    Command: 'static + Clone + Send + serde::Serialize,
-{
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Runs the election timer daemon that triggers elections.
     ///
     /// The daemon holds a counter and an optional deadline in a mutex. Each

+ 4 - 5
src/heartbeats.rs

@@ -7,7 +7,9 @@ use parking_lot::Mutex;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
-use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
+use crate::{
+    AppendEntriesArgs, Raft, RaftState, RemoteRaft, ReplicableCommand,
+};
 
 pub(crate) const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(150);
 
@@ -55,10 +57,7 @@ impl HeartbeatsDaemon {
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 3. serialize: they are converted to bytes to persist.
-impl<Command> Raft<Command>
-where
-    Command: 'static + Clone + Send + serde::Serialize,
-{
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Schedules tasks that send heartbeats to peers.
     ///
     /// One task is scheduled for each peer. The task sleeps for a duration

+ 2 - 0
src/lib.rs

@@ -5,6 +5,7 @@ pub use crate::messages::*;
 pub use crate::persister::Persister;
 pub use crate::raft::{Raft, Term};
 pub use crate::remote_raft::RemoteRaft;
+pub use crate::replicable_command::ReplicableCommand;
 pub use crate::snapshot::Snapshot;
 pub use crate::verify_authority::VerifyAuthorityResult;
 
@@ -27,6 +28,7 @@ mod process_request_vote;
 mod raft;
 mod raft_state;
 mod remote_raft;
+mod replicable_command;
 mod snapshot;
 mod sync_log_entries;
 mod term_marker;

+ 5 - 16
src/raft.rs

@@ -12,7 +12,9 @@ use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
 use crate::persister::PersistedRaftState;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
 use crate::verify_authority::VerifyAuthorityDaemon;
-use crate::{utils, IndexTerm, Persister, RaftState, RemoteRaft};
+use crate::{
+    utils, IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand,
+};
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -45,17 +47,7 @@ pub struct Raft<Command> {
     pub(crate) stop_wait_group: WaitGroup,
 }
 
-// Commands must be
-// 0. 'static: they have to live long enough for thread pools.
-// 1. clone: they are put in vectors and request messages.
-// 2. serializable: they are sent over RPCs and persisted.
-// 3. deserializable: they are restored from storage.
-// 4. send: they are referenced in futures.
-impl<Command> Raft<Command>
-where
-    Command:
-        'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Send,
-{
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Create a new raft instance.
     ///
     /// Each instance will create at least 4 + (number of peers) threads. The
@@ -153,10 +145,7 @@ where
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 3. serialize: they are converted to bytes to persist.
-impl<Command> Raft<Command>
-where
-    Command: 'static + Clone + Send + serde::Serialize,
-{
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Adds a new command to the log, returns its index and the current term.
     ///
     /// Returns `None` if we are not the leader. The log entry may not have been

+ 17 - 0
src/replicable_command.rs

@@ -0,0 +1,17 @@
+/// A command must satisfy the requirement of this trait, so that it could be
+/// replicated in Raft.
+// Commands must be
+// 0. 'static: they have to live long enough for thread pools.
+// 1. clone: they are put in vectors and request messages.
+// 2. serializable: they are sent over RPCs and persisted.
+// 3. deserializable: they are restored from storage.
+// 4. send: they are referenced in futures.
+pub trait ReplicableCommand:
+    'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Send
+{
+}
+
+impl<C> ReplicableCommand for C where
+    C: 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Send
+{
+}

+ 3 - 7
src/sync_log_entries.rs

@@ -3,15 +3,14 @@ use std::sync::Arc;
 
 use parking_lot::{Condvar, Mutex};
 
-use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::term_marker::TermMarker;
 use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::verify_authority::DaemonBeatTicker;
 use crate::{
-    AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer, Raft, RaftState,
-    RemoteRaft, Term,
+    check_or_record, AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer,
+    Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
 };
 
 #[repr(align(64))]
@@ -38,10 +37,7 @@ struct TaskNumber(usize);
 // 1. clone: they are copied to the persister.
 // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
 // 3. serialize: they are converted to bytes to persist.
-impl<Command> Raft<Command>
-where
-    Command: 'static + Clone + Send + serde::Serialize,
-{
+impl<Command: ReplicableCommand> Raft<Command> {
     /// Runs a daemon thread that syncs log entries to peers.
     ///
     /// This daemon watches the `new_log_entry` channel. Each item delivered by