Kaynağa Gözat

Replace persister with storage.

Jing Yang 2 yıl önce
ebeveyn
işleme
5fa583a055

+ 7 - 15
kvraft/src/server.rs

@@ -9,9 +9,9 @@ use futures::FutureExt;
 use parking_lot::Mutex;
 use serde_derive::{Deserialize, Serialize};
 
+use ruaft::storage::RaftStorageTrait;
 use ruaft::{
-    ApplyCommandMessage, Index, Persister, Raft, RemoteRaft, Term,
-    VerifyAuthorityResult,
+    ApplyCommandMessage, Index, Raft, RemoteRaft, Term, VerifyAuthorityResult,
 };
 #[cfg(all(not(test), feature = "integration-test"))]
 use test_utils::thread_local_logger::LocalLogger;
@@ -111,8 +111,7 @@ impl KVServer {
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
-        persister: impl Persister + 'static,
-        max_state_size_bytes: Option<usize>,
+        storage: impl RaftStorageTrait,
     ) -> Arc<Self> {
         let (tx, rx) = channel();
         let apply_command = move |message| {
@@ -123,17 +122,10 @@ impl KVServer {
         let ret = Arc::new(Self {
             me,
             state: Mutex::new(KVServerState::default()),
-            rf: Raft::new(
-                servers,
-                me,
-                persister,
-                apply_command,
-                max_state_size_bytes,
-                {
-                    let snapshot_holder = snapshot_holder.clone();
-                    move |index| snapshot_holder.request_snapshot(index)
-                },
-            ),
+            rf: Raft::new(servers, me, storage, apply_command, {
+                let snapshot_holder = snapshot_holder.clone();
+                move |index| snapshot_holder.request_snapshot(index)
+            }),
             keep_running: AtomicBool::new(true),
         });
         ret.process_command(snapshot_holder, rx);

+ 9 - 8
src/election.rs

@@ -6,12 +6,13 @@ use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::remote_context::RemoteContext;
+use crate::storage::SharedLogPersister;
 use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    IndexTerm, Peer, Persister, Raft, RaftState, ReplicableCommand,
-    RequestVoteArgs, State, Term,
+    IndexTerm, Peer, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
+    State, Term,
 };
 
 struct VersionedDeadline {
@@ -82,7 +83,7 @@ struct ElectionCandidate<Command> {
     election: Arc<ElectionState>,
     new_log_entry: SyncLogEntriesComms,
     verify_authority_daemon: VerifyAuthorityDaemon,
-    persister: Arc<dyn Persister>,
+    persister: SharedLogPersister<Command>,
     thread_pool: tokio::runtime::Handle,
 }
 
@@ -95,9 +96,8 @@ enum QuorumOrCancelled {
 
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
-// 1. clone: they are copied to the persister.
-// 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
-// 3. serialize: they are converted to bytes to persist.
+// 1. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
+// 2. serialize: they are converted to bytes to persist.
 impl<Command: ReplicableCommand> Raft<Command> {
     /// Runs the election timer daemon that triggers elections.
     ///
@@ -317,7 +317,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
 
-            candidate.persister.save_state(rf.persisted_state().into());
+            candidate.persister.save_term_vote(&rf);
 
             (rf.current_term, rf.log.last_index_term().unpack())
         };
@@ -431,7 +431,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
             if rf.commit_index != rf.log.last_index_term().index {
                 // Create a sentinel commit at the start of the term.
                 sentinel_commit_index = rf.log.add_term_change_entry(term);
-                this.persister.save_state(rf.persisted_state().into());
+                this.persister
+                    .append_one_entry(rf.log.at(sentinel_commit_index));
             } else {
                 sentinel_commit_index = rf.commit_index;
             }

+ 12 - 9
src/process_append_entries.rs

@@ -5,9 +5,8 @@ use crate::{
 };
 
 // Command must be
-// 1. clone: they are copied to the persister.
-// 2. serialize: they are converted to bytes to persist.
-impl<Command: Clone + serde::Serialize> Raft<Command> {
+// 1. serialize: they are converted to bytes to persist.
+impl<Command: serde::Serialize> Raft<Command> {
     pub fn process_append_entries(
         &self,
         args: AppendEntriesArgs<Command>,
@@ -27,7 +26,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
         if rf.current_term < args.term {
             rf.current_term = args.term;
             rf.voted_for = None;
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         rf.state = State::Follower;
@@ -73,7 +72,8 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
 
         // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
         // commit_index < log.end().
-        for (i, entry) in args.entries.iter().enumerate() {
+        let mut first_mismatch = rf.log.end();
+        for (i, entry) in args.entries.into_iter().enumerate() {
             let index = i + args.prev_log_index + 1;
             if rf.log.end() > index {
                 if rf.log.at(index).term != entry.term {
@@ -87,20 +87,23 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
                     // checked that index is strictly larger than commit_index.
                     // The condition that commit_index < log.end() holds.
                     rf.log.truncate(index);
-                    rf.log.push(entry.clone());
+                    rf.log.push(entry);
+                    first_mismatch = std::cmp::min(first_mismatch, index);
                 }
                 // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
                 // The condition that commit_index < log.end() holds.
             } else {
                 // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
                 // that commit_index < log.end() holds.
-                rf.log.push(entry.clone());
+                rf.log.push(entry);
+                first_mismatch = std::cmp::min(first_mismatch, index);
             }
         }
         // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
         // must still hold.
-
-        self.persister.save_state(rf.persisted_state().into());
+        if first_mismatch <= rf.log.end() {
+            self.persister.append_entries(rf.log.after(first_mismatch));
+        }
 
         // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
         // after this if-statement. log.start() <= commit_index still holds.

+ 4 - 6
src/process_install_snapshot.rs

@@ -2,7 +2,7 @@ use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
 use crate::{InstallSnapshotArgs, InstallSnapshotReply, Raft, State};
 
-impl<C: Clone + serde::Serialize> Raft<C> {
+impl<C> Raft<C> {
     pub fn process_install_snapshot(
         &self,
         args: InstallSnapshotArgs,
@@ -25,7 +25,7 @@ impl<C: Clone + serde::Serialize> Raft<C> {
         if rf.current_term < args.term {
             rf.current_term = args.term;
             rf.voted_for = None;
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         rf.state = State::Follower;
@@ -95,10 +95,8 @@ impl<C: Clone + serde::Serialize> Raft<C> {
             );
         }
 
-        self.persister.save_snapshot_and_state(
-            rf.persisted_state().into(),
-            rf.log.snapshot().1,
-        );
+        let (index_term, snapshot) = rf.log.snapshot();
+        self.persister.update_snapshot(index_term, snapshot);
 
         self.apply_command_signal.notify_one();
         InstallSnapshotReply {

+ 3 - 6
src/process_request_vote.rs

@@ -1,9 +1,6 @@
 use crate::{Raft, RequestVoteArgs, RequestVoteReply, State};
 
-// Command must be
-// 1. clone: they are copied to the persister.
-// 2. serialize: they are converted to bytes to persist.
-impl<Command: Clone + serde::Serialize> Raft<Command> {
+impl<Command> Raft<Command> {
     pub fn process_request_vote(
         &self,
         args: RequestVoteArgs,
@@ -38,7 +35,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
             rf.state = State::Follower;
 
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         let voted_for = rf.voted_for;
@@ -54,7 +51,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
             // current term. It does not hurt to update the timer again.
             // We do need to persist, though.
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
 
             RequestVoteReply {
                 term: args.term,

+ 16 - 17
src/raft.rs

@@ -11,14 +11,14 @@ use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::daemon_watch::{Daemon, DaemonWatch};
 use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
-use crate::persister::PersistedRaftState;
 use crate::remote_context::RemoteContext;
 use crate::remote_peer::RemotePeer;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::storage::{RaftStorageTrait, SharedLogPersister};
 use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::term_marker::TermMarker;
 use crate::verify_authority::VerifyAuthorityDaemon;
-use crate::{IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand};
+use crate::{IndexTerm, RaftState, RemoteRaft, ReplicableCommand};
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -35,7 +35,7 @@ pub struct Raft<Command> {
 
     pub(crate) me: Peer,
 
-    pub(crate) persister: Arc<dyn Persister>,
+    pub(crate) persister: SharedLogPersister<Command>,
 
     pub(crate) sync_log_entries_comms: SyncLogEntriesComms,
     pub(crate) apply_command_signal: Arc<Condvar>,
@@ -60,9 +60,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
-        persister: impl Persister + 'static,
+        storage: impl RaftStorageTrait,
         apply_command: impl ApplyCommandFnMut<Command>,
-        max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
     ) -> Self {
         let peer_size = peers.len();
@@ -73,12 +72,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // log.start() <= commit_index and commit_index < log.end() both hold.
         assert_eq!(state.commit_index + 1, state.log.end());
 
-        if let Ok(persisted_state) =
-            PersistedRaftState::try_from(persister.read_state())
-        {
-            state.current_term = persisted_state.current_term;
-            state.voted_for = persisted_state.voted_for;
-            state.log = persisted_state.log;
+        if let Ok(stored_state) = storage.read_state() {
+            state.current_term = stored_state.current_term();
+            state.voted_for = stored_state.voted_for();
+            stored_state.restore_log_array(&mut state.log);
+
             state.commit_index = state.log.start();
             // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: the saved
             // snapshot must have a valid log.start() and log.end(). Thus
@@ -95,7 +93,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
 
-        let persister = Arc::new(persister);
+        let storage_monitor =
+            storage.log_compaction_required().then(|| storage.monitor());
+        let persister: SharedLogPersister<Command> = storage.persister();
         let term_marker = TermMarker::create(
             inner_state.clone(),
             election.clone(),
@@ -164,7 +164,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .create_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
         // Running in a standalone thread.
         let snapshot_daemon =
-            this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
+            this.run_snapshot_daemon(storage_monitor, request_snapshot);
         daemon_watch.create_daemon(Daemon::Snapshot, snapshot_daemon);
         // Running in a standalone thread.
         let sync_log_entry_daemon =
@@ -216,7 +216,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         }
 
         let index = rf.log.add_command(term, command);
-        self.persister.save_state(rf.persisted_state().into());
+        self.persister.append_one_entry(rf.log.at(index));
 
         self.sync_log_entries_comms.update_followers(index);
 
@@ -284,7 +284,7 @@ impl RaftJoinHandle {
 
 #[cfg(test)]
 mod tests {
-    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::utils::do_nothing::{DoNothingRaftStorage, DoNothingRemoteRaft};
     use crate::ApplyCommandMessage;
 
     use super::*;
@@ -309,9 +309,8 @@ mod tests {
         let raft = Raft::new(
             vec![DoNothingRemoteRaft {}; peer_size],
             me,
-            DoNothingPersister {},
+            DoNothingRaftStorage {},
             |_: ApplyCommandMessage<i32>| {},
-            None,
             |_| {},
         );
 

+ 8 - 3
src/remote_context.rs

@@ -106,7 +106,9 @@ mod tests {
     use crate::election::ElectionState;
     use crate::remote_peer::RemotePeer;
     use crate::term_marker::TermMarker;
-    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::utils::do_nothing::{
+        DoNothingRaftStoragePersister, DoNothingRemoteRaft,
+    };
     use crate::verify_authority::VerifyAuthorityDaemon;
     use crate::{Peer, RaftState};
 
@@ -117,8 +119,11 @@ mod tests {
         let rf = Arc::new(Mutex::new(RaftState::<i32>::create(1, Peer(0))));
         let election = Arc::new(ElectionState::create());
         let verify_authority_daemon = VerifyAuthorityDaemon::create(1);
-        let term_marker =
-            TermMarker::create(rf, election, Arc::new(DoNothingPersister));
+        let term_marker = TermMarker::create(
+            rf,
+            election,
+            Arc::new(DoNothingRaftStoragePersister {}),
+        );
         let remote_peer = RemotePeer::create(
             Peer(0),
             DoNothingRemoteRaft,

+ 10 - 15
src/snapshot.rs

@@ -6,6 +6,7 @@ use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
+use crate::storage::RaftStorageMonitorTrait;
 use crate::{Index, Raft};
 
 #[derive(Clone, Debug, Default)]
@@ -123,16 +124,11 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
     /// snapshot will be saved in a temporary space. This daemon will wake up,
     /// apply the snapshot into the log and discard log entries before the
     /// snapshot. There is no guarantee that the snapshot will be applied.
-    pub(crate) fn run_snapshot_daemon(
+    pub(crate) fn run_snapshot_daemon<T: RaftStorageMonitorTrait>(
         &mut self,
-        max_state_size: Option<usize>,
+        storage_monitor: Option<T>,
         mut request_snapshot: impl RequestSnapshotFnMut,
     ) -> impl FnOnce() {
-        let max_state_size = match max_state_size {
-            Some(max_state_size) => max_state_size,
-            None => usize::MAX,
-        };
-
         let parker = Parker::new();
         let unparker = parker.unparker().clone();
         self.snapshot_daemon.unparker.replace(unparker.clone());
@@ -144,7 +140,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         let snapshot_daemon = self.snapshot_daemon.clone();
 
         log::info!("{:?} snapshot daemon running ...", me);
-        let snapshot_daemon = move || loop {
+        let snapshot_daemon = move |storage_monitor: T| loop {
             parker.park();
             if !keep_running.load(Ordering::Acquire) {
                 log::info!("{:?} snapshot daemon done.", me);
@@ -152,11 +148,12 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 // Explicitly drop every thing.
                 drop(keep_running);
                 drop(rf);
+                drop(storage_monitor);
                 drop(persister);
                 drop(snapshot_daemon);
                 break;
             }
-            if persister.state_size() >= max_state_size {
+            if storage_monitor.should_compact_log_now() {
                 let log_start = rf.lock().log.first_index_term();
                 let snapshot = {
                     let mut snapshot =
@@ -211,15 +208,13 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 // smaller than commit_index. This is the only place where
                 // log.start() changes.
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
-                persister.save_snapshot_and_state(
-                    rf.persisted_state().into(),
-                    rf.log.snapshot().1,
-                );
+                let (index_term, snapshot) = rf.log.snapshot();
+                persister.update_snapshot(index_term, snapshot);
             }
         };
         move || {
-            if max_state_size != usize::MAX {
-                snapshot_daemon()
+            if let Some(storage_monitor) = storage_monitor {
+                snapshot_daemon(storage_monitor)
             }
         }
     }

+ 6 - 6
src/term_marker.rs

@@ -1,25 +1,25 @@
 use std::sync::Arc;
 
 use parking_lot::Mutex;
-use serde::Serialize;
 
 use crate::election::ElectionState;
-use crate::{Persister, RaftState, State, Term};
+use crate::storage::SharedLogPersister;
+use crate::{RaftState, State, Term};
 
 /// A closure that updates the `Term` of the `RaftState`.
 #[derive(Clone)]
 pub(crate) struct TermMarker<Command> {
     rf: Arc<Mutex<RaftState<Command>>>,
     election: Arc<ElectionState>,
-    persister: Arc<dyn Persister>,
+    persister: SharedLogPersister<Command>,
 }
 
-impl<Command: Clone + Serialize> TermMarker<Command> {
+impl<Command> TermMarker<Command> {
     /// Create a `TermMarker` that can be passed to async tasks.
     pub fn create(
         rf: Arc<Mutex<RaftState<Command>>>,
         election: Arc<ElectionState>,
-        persister: Arc<dyn Persister>,
+        persister: SharedLogPersister<Command>,
     ) -> Self {
         Self {
             rf,
@@ -36,7 +36,7 @@ impl<Command: Clone + Serialize> TermMarker<Command> {
             rf.state = State::Follower;
 
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
     }
 }

+ 2 - 9
src/utils/integration_test.rs

@@ -1,10 +1,9 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
-    RequestVoteArgs, RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
+    RequestVoteReply, Term,
 };
-use std::sync::Arc;
 
 pub fn make_request_vote_args(
     term: Term,
@@ -51,9 +50,3 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }
-
-impl<Command> Raft<Command> {
-    pub fn persister(&self) -> Arc<dyn Persister> {
-        self.persister.clone()
-    }
-}

+ 3 - 4
test_configs/src/interceptor/mod.rs

@@ -16,7 +16,7 @@ use ruaft::{
     RequestVoteReply,
 };
 
-use crate::Persister;
+use crate::InMemoryStorage;
 
 type RaftId = usize;
 
@@ -276,9 +276,8 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
-        let persister = Persister::new();
-        let kv_server =
-            KVServer::new(client_vec.to_vec(), index, persister, max_state);
+        let storage = InMemoryStorage::create(max_state.unwrap_or(usize::MAX));
+        let kv_server = KVServer::new(client_vec.to_vec(), index, storage);
         kv_servers.push(kv_server);
     }
 

+ 14 - 23
test_configs/src/kvraft/config.rs

@@ -8,7 +8,7 @@ use rand::thread_rng;
 use kvraft::Clerk;
 use kvraft::KVServer;
 
-use crate::{register_kv_server, register_server, Persister, RpcClient};
+use crate::{register_kv_server, register_server, InMemoryStorage, RpcClient};
 
 struct ConfigState {
     kv_servers: Vec<Option<Arc<KVServer>>>,
@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Option<Persister>>>,
+    storage: Mutex<Vec<InMemoryStorage>>,
     maxraftstate: usize,
 }
 
@@ -52,12 +52,9 @@ impl Config {
             }
         }
 
-        let persister = self.storage.lock()[index]
-            .take()
-            .expect("A persister must be present to create a raft server");
+        let storage = self.storage.lock()[index].clone();
 
-        let kv =
-            KVServer::new(clients, index, persister, Some(self.maxraftstate));
+        let kv = KVServer::new(clients, index, storage);
         self.state.lock().kv_servers[index].replace(kv.clone());
 
         let raft = kv.raft().clone();
@@ -153,13 +150,12 @@ impl Config {
         }
 
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
-            let persister = kv_server.raft().persister();
-            let data = Persister::downcast_unsafe(persister.as_ref()).read();
+            let data = self.storage.lock()[index].save();
             kv_server.kill();
 
-            let persister = Persister::new();
-            persister.restore(data);
-            self.storage.lock()[index] = Some(persister);
+            let storage = InMemoryStorage::create(self.maxraftstate);
+            storage.restore(data);
+            self.storage.lock()[index] = storage;
         }
     }
 
@@ -253,16 +249,11 @@ impl Config {
     fn check_size(
         &self,
         upper: usize,
-        size_fn: impl Fn(&Persister) -> usize,
+        size_fn: impl Fn(&InMemoryStorage) -> usize,
     ) -> Result<(), String> {
         let mut over_limits = String::new();
-        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
-            let p = p
-                .as_ref()
-                .expect("KV server must be running to check size")
-                .raft()
-                .persister();
-            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
+        for (index, storage) in self.storage.lock().iter().enumerate() {
+            let size = size_fn(storage);
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
@@ -278,11 +269,11 @@ impl Config {
     }
 
     pub fn check_log_size(&self, upper: usize) -> Result<(), String> {
-        self.check_size(upper, ruaft::Persister::state_size)
+        self.check_size(upper, InMemoryStorage::state_size)
     }
 
     pub fn check_snapshot_size(&self, upper: usize) -> Result<(), String> {
-        self.check_size(upper, Persister::snapshot_size)
+        self.check_size(upper, InMemoryStorage::snapshot_size)
     }
 }
 
@@ -306,7 +297,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     storage
         .lock()
-        .resize_with(server_count, || Some(Persister::new()));
+        .resize_with(server_count, || InMemoryStorage::create(maxraftstate));
 
     let cfg = Config {
         network,

+ 10 - 12
test_configs/src/raft/config.rs

@@ -8,10 +8,11 @@ use anyhow::{anyhow, bail};
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
-use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
+use ruaft::{ApplyCommandMessage, Raft, Term};
 
 use crate::register_server;
 use crate::utils::{sleep_millis, NO_SNAPSHOT};
+use crate::InMemoryStorage;
 
 struct ConfigState {
     rafts: Vec<Option<Raft<i32>>>,
@@ -22,7 +23,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     max_index: usize,
-    saved: Vec<Option<crate::Persister>>,
+    saved: Vec<InMemoryStorage>,
 }
 
 pub struct Config {
@@ -303,13 +304,13 @@ impl Config {
         // the apply command function.
         let Some(raft) = raft else { return };
 
-        let data = raft.persister().read_state();
+        let data = self.log.lock().saved[index].save();
         raft.kill().join();
 
         let mut log = self.log.lock();
-        let persister = crate::Persister::new();
-        persister.save_state(data);
-        log.saved[index] = Some(persister);
+        let storage = InMemoryStorage::create(usize::MAX);
+        storage.restore(data);
+        log.saved[index] = storage;
     }
 
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -327,17 +328,14 @@ impl Config {
                 )))
             }
         }
-        let persister = self.log.lock().saved[index]
-            .take()
-            .expect("A persister must be present to create a raft server");
+        let storage = self.log.lock().saved[index].clone();
 
         let log = self.log.clone();
         let raft = Raft::new(
             clients,
             index,
-            persister,
+            storage,
             move |message| Self::apply_command(log.clone(), index, message),
-            None,
             NO_SNAPSHOT,
         );
         self.state.lock().rafts[index].replace(raft.clone());
@@ -493,7 +491,7 @@ pub fn make_config(
     });
 
     let mut saved = vec![];
-    saved.resize_with(server_count, || Some(crate::Persister::new()));
+    saved.resize_with(server_count, || InMemoryStorage::create(usize::MAX));
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         results: vec![],

+ 2 - 3
test_configs/src/rpcs.rs

@@ -216,7 +216,7 @@ pub fn register_kv_server<
 
 #[cfg(test)]
 mod tests {
-    use ruaft::utils::do_nothing::DoNothingPersister;
+    use ruaft::utils::do_nothing::DoNothingRaftStorage;
     use ruaft::utils::integration_test::{
         make_append_entries_args, make_request_vote_args,
         unpack_append_entries_reply, unpack_request_vote_reply,
@@ -240,9 +240,8 @@ mod tests {
             let raft = Raft::new(
                 vec![RpcClient(client)],
                 0,
-                DoNothingPersister,
+                DoNothingRaftStorage,
                 |_: ApplyCommandMessage<i32>| {},
-                None,
                 crate::utils::NO_SNAPSHOT,
             );
             register_server(raft, name, network.as_ref())?;