瀏覽代碼

Remove "me" from the raft peers array.

Jing Yang 3 年之前
父節點
當前提交
f9d2a15e8e
共有 6 個文件被更改,包括 85 次插入56 次删除
  1. 4 6
      src/election.rs
  2. 27 2
      src/raft.rs
  3. 2 47
      src/remote_context.rs
  4. 1 1
      src/sync_log_entries.rs
  5. 50 0
      src/utils/do_nothing.rs
  6. 1 0
      src/utils/mod.rs

+ 4 - 6
src/election.rs

@@ -362,12 +362,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
     ) -> Vec<tokio::task::JoinHandle<Option<bool>>> {
         let mut votes = vec![];
         for peer in candidate.peers.clone().into_iter() {
-            if peer != candidate.me {
-                let one_vote = candidate
-                    .thread_pool
-                    .spawn(Self::request_vote(peer, args.clone()));
-                votes.push(one_vote);
-            }
+            let one_vote = candidate
+                .thread_pool
+                .spawn(Self::request_vote(peer, args.clone()));
+            votes.push(one_vote);
         }
         votes
     }

+ 27 - 2
src/raft.rs

@@ -1,8 +1,8 @@
-use crossbeam_utils::sync::WaitGroup;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
+use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use serde_derive::{Deserialize, Serialize};
 
@@ -133,7 +133,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             })
             .build()
             .expect("Creating thread pool should not fail");
-        let peers = (0..peer_size).map(Peer).collect();
+        let peers = (0..peer_size).filter(|p| *p != me).map(Peer).collect();
         let (sync_log_entries_comms, sync_log_entries_daemon) =
             crate::sync_log_entries::create(peer_size);
 
@@ -283,6 +283,11 @@ impl RaftJoinHandle {
 
 #[cfg(test)]
 mod tests {
+    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::ApplyCommandMessage;
+
+    use super::*;
+
     #[test]
     fn test_raft_must_sync() {
         let optional_raft: Option<super::Raft<i32>> = None;
@@ -294,4 +299,24 @@ mod tests {
         // The following raft is not Sync.
         // let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
     }
+
+    #[test]
+    fn test_no_me_in_peers() {
+        let peer_size = 5;
+        let me = 2;
+
+        let raft = Raft::new(
+            vec![DoNothingRemoteRaft {}; peer_size],
+            me,
+            Arc::new(DoNothingPersister {}),
+            |_: ApplyCommandMessage<i32>| {},
+            None,
+            |_| {},
+        );
+
+        assert_eq!(4, raft.peers.len());
+        for peer in &raft.peers {
+            assert_ne!(peer.0, me);
+        }
+    }
 }

+ 2 - 47
src/remote_context.rs

@@ -101,62 +101,17 @@ mod tests {
     use std::panic::catch_unwind;
     use std::sync::Arc;
 
-    use async_trait::async_trait;
-    use bytes::Bytes;
     use parking_lot::Mutex;
 
     use crate::election::ElectionState;
     use crate::remote_peer::RemotePeer;
     use crate::term_marker::TermMarker;
+    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
     use crate::verify_authority::VerifyAuthorityDaemon;
-    use crate::{
-        AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
-        InstallSnapshotReply, Peer, Persister, RaftState, RemoteRaft,
-        RequestVoteArgs, RequestVoteReply,
-    };
+    use crate::{Peer, RaftState};
 
     use super::RemoteContext;
 
-    struct DoNothingPersister;
-    impl Persister for DoNothingPersister {
-        fn read_state(&self) -> Bytes {
-            Bytes::new()
-        }
-
-        fn save_state(&self, _bytes: Bytes) {}
-
-        fn state_size(&self) -> usize {
-            0
-        }
-
-        fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
-    }
-
-    struct DoNothingRemoteRaft;
-    #[async_trait]
-    impl<Command: 'static + Send> RemoteRaft<Command> for DoNothingRemoteRaft {
-        async fn request_vote(
-            &self,
-            _args: RequestVoteArgs,
-        ) -> std::io::Result<RequestVoteReply> {
-            unimplemented!()
-        }
-
-        async fn append_entries(
-            &self,
-            _args: AppendEntriesArgs<Command>,
-        ) -> std::io::Result<AppendEntriesReply> {
-            unimplemented!()
-        }
-
-        async fn install_snapshot(
-            &self,
-            _args: InstallSnapshotArgs,
-        ) -> std::io::Result<InstallSnapshotReply> {
-            unimplemented!()
-        }
-    }
-
     #[test]
     fn test_context_api() {
         let rf = Arc::new(Mutex::new(RaftState::<i32>::create(1, Peer(0))));

+ 1 - 1
src/sync_log_entries.rs

@@ -133,7 +133,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
                 for peer in this.peers.iter() {
                     let peer = *peer;
-                    if peer != this.me && event.should_schedule(peer) {
+                    if event.should_schedule(peer) {
                         let progress = &peer_progress[peer.0];
                         if let Event::NewTerm(_term, index) = event {
                             progress.reset_progress(index);

+ 50 - 0
src/utils/do_nothing.rs

@@ -0,0 +1,50 @@
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::{
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, Persister, RemoteRaft, RequestVoteArgs,
+    RequestVoteReply,
+};
+
+#[derive(Clone)]
+pub struct DoNothingPersister;
+impl Persister for DoNothingPersister {
+    fn read_state(&self) -> Bytes {
+        Bytes::new()
+    }
+
+    fn save_state(&self, _bytes: Bytes) {}
+
+    fn state_size(&self) -> usize {
+        0
+    }
+
+    fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
+}
+
+#[derive(Clone)]
+pub struct DoNothingRemoteRaft;
+#[async_trait]
+impl<Command: 'static + Send> RemoteRaft<Command> for DoNothingRemoteRaft {
+    async fn request_vote(
+        &self,
+        _args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        unimplemented!()
+    }
+
+    async fn append_entries(
+        &self,
+        _args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        unimplemented!()
+    }
+
+    async fn install_snapshot(
+        &self,
+        _args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        unimplemented!()
+    }
+}

+ 1 - 0
src/utils/mod.rs

@@ -1,6 +1,7 @@
 pub use rpcs::{retry_rpc, RPC_DEADLINE};
 pub use shared_sender::SharedSender;
 
+pub mod do_nothing;
 pub mod integration_test;
 mod rpcs;
 mod shared_sender;