1
0

29 Incheckningar a6da59518b ... 4b2264fd31

Upphovsman SHA1 Meddelande Datum
  Jing Yang 4b2264fd31 Update README and add a todo. 3 år sedan
  Jing Yang e1b955979d Add active-step-down to guarantee liveness. 3 år sedan
  Jing Yang 22b3c73eaf Refuse prevote if regularly receiving heartbeats. #10 3 år sedan
  Jing Yang 3f57270a5d Consolidate the two ways to become a follower. 3 år sedan
  Jing Yang d437abc009 Update Cargo.lock in durio. 2 år sedan
  Jing Yang 4d0f07bc7e Add a script to prepare a debian machine to cross build durio. 2 år sedan
  Jing Yang 847feb248e Fix durio build after the project structure change. 2 år sedan
  Jing Yang 4ab3c2a603 Use std::pin::pin instead of futures::pin_mut. 2 år sedan
  Jing Yang ff9eda55b8 Update tarpc and other dependencies of durio. 2 år sedan
  Jing Yang 375fa59ed2 Merge duplicate implementations of null persistor. 3 år sedan
  Jing Yang 74b991c3b9 Allow one RPC for each commit and each peer in RPC count tests. 3 år sedan
  Jing Yang 1a43d98384 Remove clippy allows that are no longer necessary. 3 år sedan
  Jing Yang 63fa6ea1f5 Update Cargo.lock in durio. 3 år sedan
  Jing Yang 48c988f35b Ignore uninlined_format_args warnings in tests. 3 år sedan
  Jing Yang 0bdd359955 Disable a few clippy lints and fix others. 3 år sedan
  Jing Yang b9a9591bec Optimize serialization of Vec<u8> and boost throughput to 150%. 3 år sedan
  Jing Yang 44774c320f Drop the Arc requirement of Persister. 3 år sedan
  Jing Yang d74fadfbce Conditionally execute debugging statement and increase throughput by 8%. 3 år sedan
  Jing Yang 0f650beb1e Remove unnecessary crate path in type names. 3 år sedan
  Jing Yang af1f3eb6ad Hard code test name in tokio tests. 3 år sedan
  Jing Yang 721185df85 Fix the log name conflict in persist_tests.rs. 3 år sedan
  Jing Yang c448196ffc Use one heartbeat task instead of N. 3 år sedan
  Jing Yang f9d2a15e8e Remove "me" from the raft peers array. 3 år sedan
  Jing Yang 17a7dfa1d2 Make sure the raft state occupies entire L3 cache. 3 år sedan
  Jing Yang e3ea70c318 Move `last_applied` to the apply_command daemon. 3 år sedan
  Jing Yang a6da59518b Update README and add a todo. 3 år sedan
  Jing Yang dae0a22fec Add active-step-down to guarantee liveness. 3 år sedan
  Jing Yang daffeabf0f Refuse prevote if regularly receiving heartbeats. #10 3 år sedan
  Jing Yang a9d54182dd Consolidate the two ways to become a follower. 3 år sedan

+ 1 - 0
Cargo.toml

@@ -25,6 +25,7 @@ parking_lot = "0.12"
 rand = "0.8"
 serde = "1.0"
 serde_derive = "1.0"
+serde_bytes = "0.11.9"
 tokio = { version = "1.7", features = ["net", "rt-multi-thread", "sync", "time", "parking_lot"] }
 test_utils = { path = "test_utils", optional = true }
 

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 296 - 183
durio/Cargo.lock


+ 3 - 3
durio/build.sh

@@ -12,10 +12,10 @@
 set -ex
 
 MACHINE=gcloud
-rsync -av /Users/ditsing/Code/ruaft $MACHINE:~/compile/ --exclude 'ruaft/target' --exclude 'ruaft/.git' --exclude '.idea'
-ssh $MACHINE 'cd ~/compile/ruaft/durio && cargo build --target=armv7-unknown-linux-musleabihf --release'
+rsync -av /Users/ditsing/Code/ruaft $MACHINE:~/compile/ --exclude 'ruaft/target' --exclude 'ruaft/durio/target' --exclude 'ruaft/.git' --exclude '.idea'
+ssh $MACHINE 'cd ~/compile/ruaft/durio && $HOME/.cargo/bin/cargo build --target=armv7-unknown-linux-musleabihf --release'
 mkdir -p /tmp/ruaft
-rsync -av $MACHINE:'~/compile/ruaft/target/armv7-unknown-linux-musleabihf/release/durio' '/tmp/ruaft/durio'
+rsync -av $MACHINE:'~/compile/ruaft/durio/target/armv7-unknown-linux-musleabihf/release/durio' '/tmp/ruaft/durio'
 
 ssh alice 'pkill -9 durio || echo nothing'
 rsync -av '/tmp/ruaft/durio' alice:/tmp/durio

+ 12 - 0
durio/setup-build-debian.sh

@@ -0,0 +1,12 @@
+set -ex
+
+sudo apt-get update
+sudo apt-get install gcc-arm-linux-gnueabihf build-essential rsync
+curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
+~/.cargo/bin/rustup target add armv7-unknown-linux-musleabihf
+
+cat << EOF >> ~/.cargo/config
+
+[target.armv7-unknown-linux-musleabihf]
+linker = "arm-linux-gnueabihf-gcc"
+EOF

+ 1 - 1
durio/src/run.rs

@@ -19,7 +19,7 @@ pub(crate) async fn run_kv_instance(
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
     }
 
-    let persister = Arc::new(DoNothingPersister::default());
+    let persister = DoNothingPersister::default();
 
     let kv_server = KVServer::new(remote_rafts, me, persister, None);
     let raft = kv_server.raft().clone();

+ 3 - 1
durio/src/utils.rs

@@ -19,7 +19,9 @@ pub(crate) fn context() -> tarpc::context::Context {
 
 pub(crate) fn translate_rpc_error(e: RpcError) -> std::io::Error {
     match e {
-        RpcError::Disconnected => std::io::Error::new(ErrorKind::BrokenPipe, e),
+        RpcError::Shutdown | RpcError::Send(_) | RpcError::Receive(_) => {
+            std::io::Error::new(ErrorKind::BrokenPipe, e)
+        }
         RpcError::DeadlineExceeded => {
             std::io::Error::new(ErrorKind::TimedOut, e)
         }

+ 1 - 4
kvraft/src/async_client.rs

@@ -107,10 +107,7 @@ impl AsyncClient {
                         // Do nothing.
                     }
                     Err(e) => {
-                        panic!(
-                            "Unexpected error with indefinite retry: {:?}",
-                            e
-                        );
+                        panic!("Unexpected error with indefinite retry: {e:?}");
                     }
                 };
             };

+ 1 - 1
kvraft/src/client.rs

@@ -111,7 +111,7 @@ impl ClerkInner {
                     self.unique_id = UniqueIdSequence::new();
                 }
                 Err(e) => {
-                    panic!("Unexpected error with indefinite retry: {:?}", e);
+                    panic!("Unexpected error with indefinite retry: {e:?}")
                 }
             };
         }

+ 1 - 1
kvraft/src/server.rs

@@ -111,7 +111,7 @@ impl KVServer {
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         max_state_size_bytes: Option<usize>,
     ) -> Arc<Self> {
         let (tx, rx) = channel();

+ 5 - 4
kvraft/src/snapshot_holder.rs

@@ -45,9 +45,10 @@ impl<T: Serialize> SnapshotHolder<T> {
 
 impl<T: DeserializeOwned> SnapshotHolder<T> {
     pub fn load_snapshot(&self, snapshot: Snapshot) -> T {
-        bincode::deserialize(&snapshot.data).expect(&*format!(
-            "Deserialization should never fail, {:?}",
-            &snapshot.data
-        ))
+        if let Ok(result) = bincode::deserialize(&snapshot.data) {
+            result
+        } else {
+            panic!("Deserialization should never fail, {:?}", snapshot.data)
+        }
     }
 }

+ 2 - 0
kvraft/tests/service_test.rs

@@ -1,3 +1,5 @@
+#![allow(clippy::uninlined_format_args)]
+
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 

+ 2 - 0
linearizability/src/lib.rs

@@ -195,6 +195,7 @@ mod tests {
     }
     #[test]
     fn no_accept() {
+        #[allow(clippy::box_default)]
         let ops = Box::leak(Box::new(vec![]));
         let start = Instant::now();
         for i in 0..4 {
@@ -210,6 +211,7 @@ mod tests {
 
     #[test]
     fn accept() {
+        #[allow(clippy::box_default)]
         let ops = Box::leak(Box::new(vec![]));
         let start = Instant::now();
         for i in 0..4 {

+ 7 - 6
src/apply_command.rs

@@ -57,10 +57,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
         move || {
             log::info!("{:?} apply command daemon running ...", me);
 
+            let mut last_applied = 0;
             while keep_running.load(Ordering::Relaxed) {
                 let messages = {
                     let mut rf = rf.lock();
-                    if rf.last_applied >= rf.commit_index {
+                    if last_applied >= rf.commit_index {
                         // We have applied all committed log entries, wait until
                         // new log entries are committed.
                         condvar.wait_for(&mut rf, HEARTBEAT_INTERVAL);
@@ -69,17 +70,17 @@ impl<Command: ReplicableCommand> Raft<Command> {
                     // always smaller than or equal to commit index, as
                     // guaranteed by the SNAPSHOT_INDEX_INVARIANT.
                     assert!(rf.log.start() <= rf.commit_index);
-                    if rf.last_applied < rf.log.start() {
+                    if last_applied < rf.log.start() {
                         let (index_term, data) = rf.log.snapshot();
                         let messages =
                             vec![ApplyCommandMessage::Snapshot(Snapshot {
                                 last_included_index: index_term.index,
                                 data: data.to_vec(),
                             })];
-                        rf.last_applied = rf.log.start();
+                        last_applied = rf.log.start();
                         messages
-                    } else if rf.last_applied < rf.commit_index {
-                        let index = rf.last_applied + 1;
+                    } else if last_applied < rf.commit_index {
+                        let index = last_applied + 1;
                         let last_one = rf.commit_index + 1;
                         // This is safe because commit_index is always smaller
                         // than log.end(), see COMMIT_INDEX_INVARIANT.
@@ -95,7 +96,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                                 )
                             })
                             .collect();
-                        rf.last_applied = rf.commit_index;
+                        last_applied = rf.commit_index;
                         messages
                     } else {
                         continue;

+ 0 - 2
src/daemon_env.rs

@@ -158,7 +158,6 @@ impl DaemonEnv {
             voted_for: raft.voted_for,
             log: raft.log.all_index_term(),
             commit_index: raft.commit_index,
-            last_applied: raft.last_applied,
             state: raft.state,
             leader_id: raft.leader_id,
         }
@@ -172,7 +171,6 @@ struct StrippedRaftState {
     voted_for: Option<Peer>,
     log: Vec<IndexTerm>,
     commit_index: usize,
-    last_applied: usize,
     state: State,
     leader_id: Peer,
 }

+ 4 - 6
src/election.rs

@@ -362,12 +362,10 @@ impl<Command: ReplicableCommand> Raft<Command> {
     ) -> Vec<tokio::task::JoinHandle<Option<bool>>> {
         let mut votes = vec![];
         for peer in candidate.peers.clone().into_iter() {
-            if peer != candidate.me {
-                let one_vote = candidate
-                    .thread_pool
-                    .spawn(Self::request_vote(peer, args.clone()));
-                votes.push(one_vote);
-            }
+            let one_vote = candidate
+                .thread_pool
+                .spawn(Self::request_vote(peer, args.clone()));
+            votes.push(one_vote);
         }
         votes
     }

+ 20 - 21
src/heartbeats.rs

@@ -1,3 +1,4 @@
+use std::pin::pin;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, Instant};
@@ -67,29 +68,27 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// The request message is a stripped down version of `AppendEntries`. The
     /// response from the peer is ignored.
     pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
-        for peer in self.peers.clone().into_iter() {
-            if peer != self.me {
-                // rf is now owned by the outer async function.
-                let rf = self.inner_state.clone();
-                // A on-demand trigger to sending a heartbeat.
-                let mut trigger = self.heartbeats_daemon.sender.subscribe();
-                // Shutdown signal.
-                let keep_running = self.keep_running.clone();
-                self.thread_pool.spawn(async move {
-                    let mut interval = tokio::time::interval(interval);
-                    while keep_running.load(Ordering::Relaxed) {
-                        let tick = interval.tick();
-                        let trigger = trigger.recv();
-                        futures_util::pin_mut!(tick, trigger);
-                        let _ =
-                            futures_util::future::select(tick, trigger).await;
-                        if let Some(args) = Self::build_heartbeat(&rf) {
-                            tokio::spawn(Self::send_heartbeat(peer, args));
-                        }
+        // rf is now owned by the outer async function.
+        let rf = self.inner_state.clone();
+        // A on-demand trigger to sending a heartbeat.
+        let mut trigger = self.heartbeats_daemon.sender.subscribe();
+        // Shutdown signal.
+        let keep_running = self.keep_running.clone();
+        let peers = self.peers.clone();
+
+        self.thread_pool.spawn(async move {
+            let mut interval = tokio::time::interval(interval);
+            while keep_running.load(Ordering::Relaxed) {
+                let tick = pin!(interval.tick());
+                let trigger = pin!(trigger.recv());
+                let _ = futures_util::future::select(tick, trigger).await;
+                if let Some(args) = Self::build_heartbeat(&rf) {
+                    for peer in &peers {
+                        tokio::spawn(Self::send_heartbeat(*peer, args.clone()));
                     }
-                });
+                }
             }
-        }
+        });
     }
 
     fn build_heartbeat(

+ 3 - 0
src/lib.rs

@@ -1,3 +1,6 @@
+#![allow(clippy::unchecked_duration_subtraction)]
+#![allow(clippy::uninlined_format_args)]
+
 pub use crate::apply_command::ApplyCommandMessage;
 pub use crate::index_term::IndexTerm;
 pub use crate::log_array::Index;

+ 1 - 0
src/log_array.rs

@@ -46,6 +46,7 @@ pub struct LogEntry<Command> {
 #[derive(Clone, Serialize, Deserialize)]
 pub(crate) struct LogArray<C> {
     inner: Vec<LogEntry<C>>,
+    #[serde(with = "serde_bytes")]
     snapshot: Vec<u8>,
 }
 

+ 1 - 1
src/messages.rs

@@ -42,7 +42,7 @@ pub struct InstallSnapshotArgs {
     pub(crate) leader_id: Peer,
     pub(crate) last_included_index: Index,
     pub(crate) last_included_term: Term,
-    // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
+    #[serde(with = "serde_bytes")]
     pub(crate) data: Vec<u8>,
     pub(crate) offset: usize,
     pub(crate) done: bool,

+ 29 - 3
src/raft.rs

@@ -1,8 +1,8 @@
-use crossbeam_utils::sync::WaitGroup;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
+use crossbeam_utils::sync::WaitGroup;
 use parking_lot::{Condvar, Mutex};
 use serde_derive::{Deserialize, Serialize};
 
@@ -60,7 +60,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         apply_command: impl ApplyCommandFnMut<Command>,
         max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
@@ -95,6 +95,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
 
+        let persister = Arc::new(persister);
         let term_marker = TermMarker::create(
             inner_state.clone(),
             election.clone(),
@@ -133,7 +134,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             })
             .build()
             .expect("Creating thread pool should not fail");
-        let peers = (0..peer_size).map(Peer).collect();
+        let peers = (0..peer_size).filter(|p| *p != me).map(Peer).collect();
         let (sync_log_entries_comms, sync_log_entries_daemon) =
             crate::sync_log_entries::create(peer_size);
 
@@ -285,6 +286,11 @@ impl RaftJoinHandle {
 
 #[cfg(test)]
 mod tests {
+    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::ApplyCommandMessage;
+
+    use super::*;
+
     #[test]
     fn test_raft_must_sync() {
         let optional_raft: Option<super::Raft<i32>> = None;
@@ -296,4 +302,24 @@ mod tests {
         // The following raft is not Sync.
         // let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
     }
+
+    #[test]
+    fn test_no_me_in_peers() {
+        let peer_size = 5;
+        let me = 2;
+
+        let raft = Raft::new(
+            vec![DoNothingRemoteRaft {}; peer_size],
+            me,
+            DoNothingPersister {},
+            |_: ApplyCommandMessage<i32>| {},
+            None,
+            |_| {},
+        );
+
+        assert_eq!(4, raft.peers.len());
+        for peer in &raft.peers {
+            assert_ne!(peer.0, me);
+        }
+    }
 }

+ 1 - 2
src/raft_state.rs

@@ -17,13 +17,13 @@ pub(crate) struct FollowerData {
     last_heartbeat: Option<Instant>,
 }
 
+#[repr(align(64))]
 pub(crate) struct RaftState<Command> {
     pub current_term: Term,
     pub voted_for: Option<Peer>,
     pub log: LogArray<Command>,
 
     pub commit_index: Index,
-    pub last_applied: Index,
 
     pub match_index: Vec<Index>,
 
@@ -39,7 +39,6 @@ impl<Command> RaftState<Command> {
             voted_for: None,
             log: LogArray::create(),
             commit_index: 0,
-            last_applied: 0,
             match_index: vec![0; peer_size],
             state: State::Follower(FollowerData::default()),
             leader_id: me,

+ 2 - 47
src/remote_context.rs

@@ -101,62 +101,17 @@ mod tests {
     use std::panic::catch_unwind;
     use std::sync::Arc;
 
-    use async_trait::async_trait;
-    use bytes::Bytes;
     use parking_lot::Mutex;
 
     use crate::election::ElectionState;
     use crate::remote_peer::RemotePeer;
     use crate::term_marker::TermMarker;
+    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
     use crate::verify_authority::VerifyAuthorityDaemon;
-    use crate::{
-        AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
-        InstallSnapshotReply, Peer, Persister, RaftState, RemoteRaft,
-        RequestVoteArgs, RequestVoteReply,
-    };
+    use crate::{Peer, RaftState};
 
     use super::RemoteContext;
 
-    struct DoNothingPersister;
-    impl Persister for DoNothingPersister {
-        fn read_state(&self) -> Bytes {
-            Bytes::new()
-        }
-
-        fn save_state(&self, _bytes: Bytes) {}
-
-        fn state_size(&self) -> usize {
-            0
-        }
-
-        fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
-    }
-
-    struct DoNothingRemoteRaft;
-    #[async_trait]
-    impl<Command: 'static + Send> RemoteRaft<Command> for DoNothingRemoteRaft {
-        async fn request_vote(
-            &self,
-            _args: RequestVoteArgs,
-        ) -> std::io::Result<RequestVoteReply> {
-            unimplemented!()
-        }
-
-        async fn append_entries(
-            &self,
-            _args: AppendEntriesArgs<Command>,
-        ) -> std::io::Result<AppendEntriesReply> {
-            unimplemented!()
-        }
-
-        async fn install_snapshot(
-            &self,
-            _args: InstallSnapshotArgs,
-        ) -> std::io::Result<InstallSnapshotReply> {
-            unimplemented!()
-        }
-    }
-
     #[test]
     fn test_context_api() {
         let rf = Arc::new(Mutex::new(RaftState::<i32>::create(1, Peer(0))));

+ 2 - 2
src/sync_log_entries.rs

@@ -34,7 +34,7 @@ impl Event {
 
 #[derive(Clone)]
 pub(crate) struct SyncLogEntriesComms {
-    tx: crate::utils::SharedSender<Event>,
+    tx: SharedSender<Event>,
 }
 
 impl SyncLogEntriesComms {
@@ -133,7 +133,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 }
                 for peer in this.peers.iter() {
                     let peer = *peer;
-                    if peer != this.me && event.should_schedule(peer) {
+                    if event.should_schedule(peer) {
                         let progress = &peer_progress[peer.0];
                         if let Event::NewTerm(_term, index) = event {
                             progress.reset_progress(index);

+ 52 - 0
src/utils/do_nothing.rs

@@ -0,0 +1,52 @@
+#![cfg(feature = "integration-test")]
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::{
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, Persister, RemoteRaft, RequestVoteArgs,
+    RequestVoteReply,
+};
+
+#[derive(Clone)]
+pub struct DoNothingPersister;
+impl Persister for DoNothingPersister {
+    fn read_state(&self) -> Bytes {
+        Bytes::new()
+    }
+
+    fn save_state(&self, _bytes: Bytes) {}
+
+    fn state_size(&self) -> usize {
+        0
+    }
+
+    fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
+}
+
+#[derive(Clone)]
+pub struct DoNothingRemoteRaft;
+#[async_trait]
+impl<Command: 'static + Send> RemoteRaft<Command> for DoNothingRemoteRaft {
+    async fn request_vote(
+        &self,
+        _args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        unimplemented!()
+    }
+
+    async fn append_entries(
+        &self,
+        _args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        unimplemented!()
+    }
+
+    async fn install_snapshot(
+        &self,
+        _args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        unimplemented!()
+    }
+}

+ 9 - 2
src/utils/integration_test.rs

@@ -1,9 +1,10 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
-    RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
+    RequestVoteArgs, RequestVoteReply, Term,
 };
+use std::sync::Arc;
 
 pub fn make_request_vote_args(
     term: Term,
@@ -50,3 +51,9 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }
+
+impl<Command> Raft<Command> {
+    pub fn persister(&self) -> Arc<dyn Persister> {
+        self.persister.clone()
+    }
+}

+ 1 - 0
src/utils/mod.rs

@@ -1,6 +1,7 @@
 pub use rpcs::{retry_rpc, RPC_DEADLINE};
 pub use shared_sender::SharedSender;
 
+pub mod do_nothing;
 pub mod integration_test;
 mod rpcs;
 mod shared_sender;

+ 0 - 2
src/verify_authority.rs

@@ -415,8 +415,6 @@ impl<Command: 'static + Send> Raft<Command> {
 
 #[cfg(test)]
 mod tests {
-    #![allow(clippy::needless_borrow)]
-
     use super::*;
 
     const PEER_SIZE: usize = 5;

+ 3 - 7
test_configs/src/interceptor/mod.rs

@@ -263,7 +263,6 @@ impl Config {
 
 pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
     let (event_queue, clients) = make_grid_clients(server_count);
-    let persister = Arc::new(Persister::new());
     let mut kv_servers = vec![];
     let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
         .into_iter()
@@ -277,12 +276,9 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
-        let kv_server = KVServer::new(
-            client_vec.to_vec(),
-            index,
-            persister.clone(),
-            max_state,
-        );
+        let persister = Persister::new();
+        let kv_server =
+            KVServer::new(client_vec.to_vec(), index, persister, max_state);
         kv_servers.push(kv_server);
     }
 

+ 18 - 11
test_configs/src/kvraft/config.rs

@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Arc<Persister>>>,
+    storage: Mutex<Vec<Option<Persister>>>,
     maxraftstate: usize,
 }
 
@@ -52,7 +52,9 @@ impl Config {
             }
         }
 
-        let persister = self.storage.lock()[index].clone();
+        let persister = self.storage.lock()[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let kv =
             KVServer::new(clients, index, persister, Some(self.maxraftstate));
@@ -150,14 +152,14 @@ impl Config {
             network.remove_server(Self::kv_server_name(index));
         }
 
-        let data = self.storage.lock()[index].read();
-
-        let persister = Arc::new(Persister::new());
-        self.storage.lock()[index] = persister.clone();
-        persister.restore(data);
-
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
+            let persister = kv_server.raft().persister();
+            let data = Persister::downcast_unsafe(persister.as_ref()).read();
             kv_server.kill();
+
+            let persister = Persister::new();
+            persister.restore(data);
+            self.storage.lock()[index] = Some(persister);
         }
     }
 
@@ -254,8 +256,13 @@ impl Config {
         size_fn: impl Fn(&Persister) -> usize,
     ) -> Result<(), String> {
         let mut over_limits = String::new();
-        for (index, p) in self.storage.lock().iter().enumerate() {
-            let size = size_fn(p);
+        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
+            let p = p
+                .as_ref()
+                .expect("KV server must be running to check size")
+                .raft()
+                .persister();
+            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
@@ -299,7 +306,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     storage
         .lock()
-        .resize_with(server_count, || Arc::new(Persister::new()));
+        .resize_with(server_count, || Some(Persister::new()));
 
     let cfg = Config {
         network,

+ 2 - 0
test_configs/src/lib.rs

@@ -1,3 +1,5 @@
+#![allow(clippy::uninlined_format_args)]
+
 pub mod interceptor;
 pub mod kvraft;
 mod persister;

+ 4 - 0
test_configs/src/persister.rs

@@ -59,4 +59,8 @@ impl Persister {
     pub fn snapshot_size(&self) -> usize {
         self.state.lock().snapshot.len()
     }
+
+    pub fn downcast_unsafe(trait_obj: &dyn ruaft::Persister) -> &Self {
+        unsafe { &*(trait_obj as *const dyn ruaft::Persister as *const Self) }
+    }
 }

+ 14 - 9
test_configs/src/raft/config.rs

@@ -22,7 +22,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     max_index: usize,
-    saved: Vec<Arc<crate::Persister>>,
+    saved: Vec<Option<crate::Persister>>,
 }
 
 pub struct Config {
@@ -297,16 +297,19 @@ impl Config {
         // 4. Follower appended entries, replied to the leader. Note although
         // the follower is removed from the network, it can still send replies.
         // 5. The leader believes the entries are appended, but they are not.
-        let data = self.log.lock().saved[index].read_state();
+
         // Make sure to give up the log lock before calling external code, which
         // might directly or indirectly block on the log lock, e.g. through
         // the apply command function.
-        if let Some(raft) = raft {
-            raft.kill().join();
-        }
+        let Some(raft) = raft else { return };
+
+        let data = raft.persister().read_state();
+        raft.kill().join();
+
         let mut log = self.log.lock();
-        log.saved[index] = Arc::new(crate::Persister::new());
-        log.saved[index].save_state(data);
+        let persister = crate::Persister::new();
+        persister.save_state(data);
+        log.saved[index] = Some(persister);
     }
 
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -324,7 +327,9 @@ impl Config {
                 )))
             }
         }
-        let persister = self.log.lock().saved[index].clone();
+        let persister = self.log.lock().saved[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let log = self.log.clone();
         let raft = Raft::new(
@@ -488,7 +493,7 @@ pub fn make_config(
     });
 
     let mut saved = vec![];
-    saved.resize_with(server_count, || Arc::new(crate::Persister::new()));
+    saved.resize_with(server_count, || Some(crate::Persister::new()));
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         results: vec![],

+ 2 - 20
test_configs/src/rpcs.rs

@@ -216,10 +216,7 @@ pub fn register_kv_server<
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
-
-    use bytes::Bytes;
-
+    use ruaft::utils::do_nothing::DoNothingPersister;
     use ruaft::utils::integration_test::{
         make_append_entries_args, make_request_vote_args,
         unpack_append_entries_reply, unpack_request_vote_reply,
@@ -228,21 +225,6 @@ mod tests {
 
     use super::*;
 
-    struct DoNothingPersister;
-    impl ruaft::Persister for DoNothingPersister {
-        fn read_state(&self) -> Bytes {
-            Bytes::new()
-        }
-
-        fn save_state(&self, _bytes: Bytes) {}
-
-        fn state_size(&self) -> usize {
-            0
-        }
-
-        fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
-    }
-
     #[test]
     fn test_basic_message() -> std::io::Result<()> {
         test_utils::init_test_log!();
@@ -258,7 +240,7 @@ mod tests {
             let raft = Raft::new(
                 vec![RpcClient(client)],
                 0,
-                Arc::new(DoNothingPersister),
+                DoNothingPersister,
                 |_: ApplyCommandMessage<i32>| {},
                 None,
                 crate::utils::NO_SNAPSHOT,

+ 2 - 3
test_utils/src/logging.rs

@@ -36,8 +36,7 @@ pub fn init_log(module: &str) -> std::io::Result<PathBuf> {
         .take(10)
         .map(char::from)
         .collect();
-    let log_file_name =
-        format!("{}-{:010}-{}.log", module_file, timestamp, suffix);
+    let log_file_name = format!("{module_file}-{timestamp:010}-{suffix}.log");
 
     let log_dir = option_env!("LOG_DIR").unwrap_or(LOG_DIR);
     let mut path = PathBuf::from(log_dir);
@@ -49,7 +48,7 @@ pub fn init_log(module: &str) -> std::io::Result<PathBuf> {
     {
         let mut latest_path = path.clone();
         latest_path.pop();
-        latest_path.push(format!("{}-latest", module_file));
+        latest_path.push(format!("{module_file}-latest"));
         let _ = std::fs::remove_file(latest_path.as_path());
         #[cfg(unix)]
         let _ = std::os::unix::fs::symlink(path.as_path(), latest_path);

+ 2 - 1
tests/agreement_tests.rs

@@ -1,4 +1,5 @@
 #![allow(clippy::identity_op)]
+#![allow(clippy::uninlined_format_args)]
 
 use rand::{thread_rng, Rng};
 use scopeguard::defer;
@@ -311,7 +312,7 @@ fn count() -> config::Result<()> {
         }
 
         let diff = cfg.total_rpcs() - start_total;
-        if diff > ITERS + 1 + 3 {
+        if diff > (ITERS + 1 + 3) * (SERVERS - 1) {
             panic!("too many RPCs ({}) for {} entries", diff, ITERS);
         }
 

+ 8 - 7
tests/persist_tests.rs

@@ -301,9 +301,12 @@ fn figure8_unreliable() -> config::Result<()> {
     Ok(())
 }
 
-fn internal_churn(unreliable: bool) -> config::Result<()> {
+fn internal_churn(
+    unreliable: bool,
+    test_name: &'static str,
+) -> config::Result<()> {
     const SERVERS: usize = 5;
-    let cfg = Arc::new(make_config!(SERVERS, false));
+    let cfg = Arc::new(config::make_config(SERVERS, false, test_name));
     defer!(cfg.cleanup());
 
     if unreliable {
@@ -410,9 +413,7 @@ fn internal_churn(unreliable: bool) -> config::Result<()> {
     for cmd in all_cmds {
         assert!(
             consented.contains(&cmd),
-            "Cmd {} not found in {:?}",
-            cmd,
-            consented
+            "Cmd {cmd} not found in {consented:?}"
         );
     }
 
@@ -422,10 +423,10 @@ fn internal_churn(unreliable: bool) -> config::Result<()> {
 
 #[test]
 fn reliable_churn() -> config::Result<()> {
-    internal_churn(false)
+    internal_churn(false, stdext::function_name!())
 }
 
 #[test]
 fn unreliable_churn() -> config::Result<()> {
-    internal_churn(true)
+    internal_churn(true, stdext::function_name!())
 }

+ 4 - 3
tests/regression_tests.rs

@@ -5,11 +5,11 @@ use ruaft::utils::integration_test::{
     unpack_append_entries_args, unpack_append_entries_reply,
 };
 use test_configs::interceptor::{make_config, RaftRpcEvent};
-use test_utils::init_test_log;
 
 #[tokio::test(flavor = "multi_thread")]
 async fn smoke_test() {
-    init_test_log!();
+    test_utils::init_log("regression_tests-smoke_test")
+        .expect("Initializing test log should never fail");
     let server_count = 3;
     let config = make_config(server_count, None);
     let config = Arc::new(config);
@@ -56,7 +56,8 @@ async fn smoke_test() {
 
 #[tokio::test(flavor = "multi_thread")]
 async fn delayed_commit_consistency_test() {
-    init_test_log!();
+    test_utils::init_log("regression_tests-delayed_commit_consistency_test")
+        .expect("Initializing test log should never fail");
     let server_count = 3;
     let config = Arc::new(make_config(server_count, None));
 

+ 13 - 0
tests/snapshot_tests.rs

@@ -167,3 +167,16 @@ fn linearizability() {
         test_linearizability: true,
     });
 }
+
+#[ignore = "Large test with too many threads"]
+#[test]
+fn snapshot_throughput() {
+    init_test_log!();
+    generic_test(GenericTestParams {
+        // To boost client count to 48 we need more network threads in labrpc.
+        clients: 32,
+        crash: true,
+        maxraftstate: Some(10000),
+        ..Default::default()
+    })
+}

Vissa filer visades inte eftersom för många filer har ändrats