Forráskód Böngészése

Use generic types to allow different types of Command.

At the moment, Command needs to be Clone, Serializable and
Deserializable. It also needs to provide a default value
to be used in the first element of the log.

In addition, Command needs to be 'static, Send and Sync.
This is less optimal and might be changed in the future.

Squashed commit of the following:

commit 71dd764fa2c5155f8e5b5dcfeca8520a5ad07ec7
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 20:17:58 2021 -0700

    Use generic type Command instead of a wrapper.

commit 6d5684c60ff236a15035533fabb59c467a2c0764
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 20:08:55 2021 -0700

    Use one where clause.

commit 292bf5aa1f20aa7089e4a13f5aa84117975e4a1b
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 20:05:43 2021 -0700

    Revert a change.

commit 3fd689afc542be1a7a4eb667a271f84d37af588b
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 19:54:46 2021 -0700

    Remove unused line.

commit 59d3a86216e72059c8a038624d4ecccc1f5ae414
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 19:01:42 2021 -0700

    Update comments.

commit db99f1dee3ef51ab0b2d7d73452918d618fdf5aa
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 18:44:42 2021 -0700

    Refine requirements 2.

commit 366f0e557e8eb178060a400ccc5533a5dae7c940
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 18:33:24 2021 -0700

    Refine require attempt 1.

commit 8d98e50fdf496af28998f832887622f2ab2807ce
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 18:21:22 2021 -0700

    Run rustfmt.

commit fcb50ad4017bd63a8ebfa7a93f2ee520c4f394bf
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 18:07:45 2021 -0700

    Make Raft fully generic.

commit cd72a928b6e5f74e6fa32fdde61dc712707cbfdb
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 12:55:46 2021 -0700

    Make LogEntry generic.

commit 6a14f49187483232f65d83b65af52b136a4087f2
Author: Jing Yang <ditsing@gmail.com>
Date:   Tue Mar 30 12:54:46 2021 -0700

    Make command generic.

commit 8439b33bf4b3f34c6b0e2e1e97cbb49535f1e5f3
Author: Jing Yang <ditsing@gmail.com>
Date:   Mon Mar 29 20:32:06 2021 -0700

    Move RaftState to a separate file.
Jing Yang 5 éve
szülő
commit
2cf4d68306
5 módosított fájl, 92 hozzáadás és 47 törlés
  1. 52 19
      src/lib.rs
  2. 11 7
      src/persister.rs
  3. 6 4
      src/raft_state.rs
  4. 19 8
      src/rpcs.rs
  5. 4 9
      tests/config/mod.rs

+ 52 - 19
src/lib.rs

@@ -38,10 +38,7 @@ struct Peer(usize);
 pub type Index = usize;
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct Command(pub i32);
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-struct LogEntry {
+struct LogEntry<Command> {
     term: Term,
     index: Index,
     // TODO: Allow sending of arbitrary information.
@@ -56,8 +53,8 @@ struct ElectionState {
 }
 
 #[derive(Clone)]
-pub struct Raft {
-    inner_state: Arc<Mutex<RaftState>>,
+pub struct Raft<Command> {
+    inner_state: Arc<Mutex<RaftState<Command>>>,
     peers: Vec<Arc<RpcClient>>,
 
     me: Peer,
@@ -89,12 +86,12 @@ struct RequestVoteReply {
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
-struct AppendEntriesArgs {
+struct AppendEntriesArgs<Command> {
     term: Term,
     leader_id: Peer,
     prev_log_index: Index,
     prev_log_term: Term,
-    entries: Vec<LogEntry>,
+    entries: Vec<LogEntry<Command>>,
     leader_commit: Index,
 }
 
@@ -107,7 +104,24 @@ struct AppendEntriesReply {
 #[repr(align(64))]
 struct Opening(Arc<AtomicUsize>);
 
-impl Raft {
+// Commands must be
+// 0. 'static: they have to live long enough for thread pools.
+// 1. clone: they are put in vectors and request messages.
+// 2. serializable: they are sent over RPCs and persisted.
+// 3. deserializable: they are restored from storage.
+// 4. send: they are referenced in futures.
+// 5. sync: they are shared to other threads.
+// 6. default, because we need an element for the first entry.
+impl<Command> Raft<Command>
+where
+    Command: 'static
+        + Clone
+        + serde::Serialize
+        + serde::de::DeserializeOwned
+        + Send
+        + Sync
+        + Default,
+{
     /// Create a new raft instance.
     ///
     /// Each instance will create at least 3 + (number of peers) threads. The
@@ -128,7 +142,7 @@ impl Raft {
             log: vec![LogEntry {
                 term: Term(0),
                 index: 0,
-                command: Command(0),
+                command: Command::default(),
             }],
             commit_index: 0,
             last_applied: 0,
@@ -187,7 +201,15 @@ impl Raft {
         this.run_election_timer();
         this
     }
+}
 
+// Command must be
+// 1. clone: they are copied to the persister.
+// 2. serialize: they are converted to bytes to persist.
+impl<Command> Raft<Command>
+where
+    Command: Clone + serde::Serialize,
+{
     pub(crate) fn process_request_vote(
         &self,
         args: RequestVoteArgs,
@@ -239,7 +261,7 @@ impl Raft {
 
     pub(crate) fn process_append_entries(
         &self,
-        args: AppendEntriesArgs,
+        args: AppendEntriesArgs<Command>,
     ) -> AppendEntriesReply {
         let mut rf = self.inner_state.lock();
         if rf.current_term > args.term {
@@ -297,7 +319,18 @@ impl Raft {
             success: true,
         }
     }
+}
 
+// Command must be
+// 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
+// 1. clone: they are copied to the persister.
+// 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
+// 3. sync: AppendEntries<Command> are used across await points for RPC retries.
+// 4. serialize: they are converted to bytes to persist.
+impl<Command> Raft<Command>
+where
+    Command: 'static + Clone + Send + Sync + serde::Serialize,
+{
     fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
         let this = self.clone();
         std::thread::spawn(move || {
@@ -456,7 +489,7 @@ impl Raft {
     async fn count_vote_util_cancelled(
         me: Peer,
         term: Term,
-        rf: Arc<Mutex<RaftState>>,
+        rf: Arc<Mutex<RaftState<Command>>>,
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
@@ -545,8 +578,8 @@ impl Raft {
     }
 
     fn build_heartbeat(
-        rf: &Arc<Mutex<RaftState>>,
-    ) -> Option<AppendEntriesArgs> {
+        rf: &Arc<Mutex<RaftState<Command>>>,
+    ) -> Option<AppendEntriesArgs<Command>> {
         let rf = rf.lock();
 
         if !rf.is_leader() {
@@ -568,7 +601,7 @@ impl Raft {
     const HEARTBEAT_RETRY: usize = 1;
     async fn send_heartbeat(
         rpc_client: Arc<RpcClient>,
-        args: AppendEntriesArgs,
+        args: AppendEntriesArgs<Command>,
     ) -> std::io::Result<()> {
         retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, |_round| {
             rpc_client.call_append_entries(args.clone())
@@ -624,7 +657,7 @@ impl Raft {
     }
 
     async fn sync_log_entry(
-        rf: Arc<Mutex<RaftState>>,
+        rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: Arc<RpcClient>,
         peer_index: usize,
         rerun: std::sync::mpsc::Sender<Option<Peer>>,
@@ -701,9 +734,9 @@ impl Raft {
     }
 
     fn build_append_entries(
-        rf: &Arc<Mutex<RaftState>>,
+        rf: &Arc<Mutex<RaftState<Command>>>,
         peer_index: usize,
-    ) -> Option<AppendEntriesArgs> {
+    ) -> Option<AppendEntriesArgs<Command>> {
         let rf = rf.lock();
         if !rf.is_leader() {
             return None;
@@ -723,7 +756,7 @@ impl Raft {
     const APPEND_ENTRIES_RETRY: usize = 1;
     async fn append_entries(
         rpc_client: &RpcClient,
-        args: AppendEntriesArgs,
+        args: AppendEntriesArgs<Command>,
     ) -> std::io::Result<Option<bool>> {
         let term = args.term;
         let reply =

+ 11 - 7
src/persister.rs

@@ -3,6 +3,8 @@ use std::convert::TryFrom;
 use bytes::Bytes;
 
 use crate::{LogEntry, Peer, RaftState, Term};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
 
 pub trait Persister: Send + Sync {
     fn read_state(&self) -> Bytes;
@@ -10,20 +12,22 @@ pub trait Persister: Send + Sync {
 }
 
 #[derive(Serialize, Deserialize)]
-pub(crate) struct PersistedRaftState {
+pub(crate) struct PersistedRaftState<Command> {
     pub current_term: Term,
     pub voted_for: Option<Peer>,
-    pub log: Vec<LogEntry>,
+    pub log: Vec<LogEntry<Command>>,
 }
 
-impl<T: AsRef<RaftState>> From<T> for PersistedRaftState {
+impl<Command: Clone, T: AsRef<RaftState<Command>>> From<T>
+    for PersistedRaftState<Command>
+{
     fn from(raft_state: T) -> Self {
         Self::from(raft_state.as_ref())
     }
 }
 
-impl From<&RaftState> for PersistedRaftState {
-    fn from(raft_state: &RaftState) -> Self {
+impl<Command: Clone> From<&RaftState<Command>> for PersistedRaftState<Command> {
+    fn from(raft_state: &RaftState<Command>) -> Self {
         Self {
             current_term: raft_state.current_term,
             voted_for: raft_state.voted_for,
@@ -32,7 +36,7 @@ impl From<&RaftState> for PersistedRaftState {
     }
 }
 
-impl TryFrom<Bytes> for PersistedRaftState {
+impl<Command: DeserializeOwned> TryFrom<Bytes> for PersistedRaftState<Command> {
     type Error = bincode::Error;
 
     fn try_from(value: Bytes) -> Result<Self, Self::Error> {
@@ -40,7 +44,7 @@ impl TryFrom<Bytes> for PersistedRaftState {
     }
 }
 
-impl Into<Bytes> for PersistedRaftState {
+impl<Command: Serialize> Into<Bytes> for PersistedRaftState<Command> {
     fn into(self) -> Bytes {
         bincode::serialize(&self)
             .expect("Serialization should not fail")

+ 6 - 4
src/raft_state.rs

@@ -8,10 +8,10 @@ pub(crate) enum State {
     Leader,
 }
 
-pub(crate) struct RaftState {
+pub(crate) struct RaftState<Command> {
     pub current_term: Term,
     pub voted_for: Option<Peer>,
-    pub log: Vec<LogEntry>,
+    pub log: Vec<LogEntry<Command>>,
 
     pub commit_index: Index,
     pub last_applied: Index,
@@ -25,11 +25,13 @@ pub(crate) struct RaftState {
     pub leader_id: Peer,
 }
 
-impl RaftState {
-    pub fn persisted_state(&self) -> PersistedRaftState {
+impl<Command: Clone> RaftState<Command> {
+    pub fn persisted_state(&self) -> PersistedRaftState<Command> {
         self.into()
     }
+}
 
+impl<Command> RaftState<Command> {
     pub fn last_log_index_and_term(&self) -> (Index, Term) {
         let len = self.log.len();
         assert!(len > 0, "There should always be at least one entry in log");

+ 19 - 8
src/rpcs.rs

@@ -7,8 +7,13 @@ use crate::{
     AppendEntriesArgs, AppendEntriesReply, Raft, RequestVoteArgs,
     RequestVoteReply,
 };
+use serde::de::DeserializeOwned;
+use serde::Serialize;
 
-fn proxy_request_vote(raft: &Raft, data: RequestMessage) -> ReplyMessage {
+fn proxy_request_vote<Command: Clone + Serialize + DeserializeOwned>(
+    raft: &Raft<Command>,
+    data: RequestMessage,
+) -> ReplyMessage {
     let reply = raft.process_request_vote(
         bincode::deserialize(data.as_ref())
             .expect("Deserialization of requests should not fail"),
@@ -19,7 +24,10 @@ fn proxy_request_vote(raft: &Raft, data: RequestMessage) -> ReplyMessage {
             .expect("Serialization of reply should not fail"),
     )
 }
-fn proxy_append_entries(raft: &Raft, data: RequestMessage) -> ReplyMessage {
+fn proxy_append_entries<Command: Clone + Serialize + DeserializeOwned>(
+    raft: &Raft<Command>,
+    data: RequestMessage,
+) -> ReplyMessage {
     let reply = raft.process_append_entries(
         bincode::deserialize(data.as_ref())
             .expect("Deserialization should not fail"),
@@ -55,9 +63,9 @@ impl RpcClient {
             .expect("Deserialization of reply should not fail"))
     }
 
-    pub(crate) async fn call_append_entries(
+    pub(crate) async fn call_append_entries<Command: Serialize>(
         &self,
-        request: AppendEntriesArgs,
+        request: AppendEntriesArgs<Command>,
     ) -> std::io::Result<AppendEntriesReply> {
         let data = RequestMessage::from(
             bincode::serialize(&request)
@@ -72,8 +80,11 @@ impl RpcClient {
     }
 }
 
-pub fn register_server<S: AsRef<str>>(
-    raft: Arc<Raft>,
+pub fn register_server<
+    Command: 'static + Clone + Serialize + DeserializeOwned,
+    S: AsRef<str>,
+>(
+    raft: Arc<Raft<Command>>,
     name: S,
     network: &Mutex<Network>,
 ) -> std::io::Result<()> {
@@ -133,7 +144,7 @@ mod tests {
                 vec![RpcClient(client)],
                 0,
                 Arc::new(()),
-                |_, _| {},
+                |_, _: i32| {},
             ));
             register_server(raft, name, network.as_ref())?;
 
@@ -155,7 +166,7 @@ mod tests {
             futures::executor::block_on(rpc_client.call_request_vote(request))?;
         assert_eq!(true, response.vote_granted);
 
-        let request = AppendEntriesArgs {
+        let request = AppendEntriesArgs::<i32> {
             term: Term(2021),
             leader_id: Peer(0),
             prev_log_index: 0,

+ 4 - 9
tests/config/mod.rs

@@ -13,7 +13,7 @@ use ruaft::{Persister, Raft, RpcClient};
 pub mod persister;
 
 struct ConfigState {
-    rafts: Vec<Option<Raft>>,
+    rafts: Vec<Option<Raft<i32>>>,
     connected: Vec<bool>,
 }
 
@@ -204,9 +204,7 @@ impl Config {
                 let state = self.state.lock();
                 if state.connected[cnt] {
                     if let Some(raft) = &state.rafts[cnt] {
-                        if let Some((_, index)) =
-                            raft.start(ruaft::Command(cmd))
-                        {
+                        if let Some((_, index)) = raft.start(cmd) {
                             first_index.replace(index);
                         }
                     }
@@ -311,7 +309,7 @@ impl Config {
         let log_clone = self.log.clone();
         let raft =
             Raft::new(clients, index, persister, move |cmd_index, cmd| {
-                Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
+                Self::apply_command(log_clone.clone(), index, cmd_index, cmd)
             });
         self.state.lock().rafts[index].replace(raft.clone());
 
@@ -328,10 +326,7 @@ impl Config {
     ) -> Option<(usize, usize)> {
         self.state.lock().rafts[leader]
             .as_ref()
-            .map(|raft| {
-                raft.start(ruaft::Command(cmd))
-                    .map(|(term, index)| (term.0, index))
-            })
+            .map(|raft| raft.start(cmd).map(|(term, index)| (term.0, index)))
             .unwrap()
     }