11 Commity d437abc009 ... 2901e7a443

Autor SHA1 Wiadomość Data
  Jing Yang 2901e7a443 Rename log_persister to internal. 2 lat temu
  Jing Yang bca6f94695 Upgrade durio to the new storage system. 2 lat temu
  Jing Yang 83ba7e894f Release space immediately after snapshot in tests. 2 lat temu
  Jing Yang 8db888f64d Delete persister. 2 lat temu
  Jing Yang f879688a6b Save snapshot term with snapshot. 2 lat temu
  Jing Yang 5fa583a055 Replace persister with storage. 2 lat temu
  Jing Yang df79979c08 Add a type_hint() helper method. 2 lat temu
  Jing Yang 19dd0a5739 Bridge the storage interface to internal types. 2 lat temu
  Jing Yang 807b8ceee7 Add an in-memory implementation of Raft storage. 2 lat temu
  Jing Yang eae9fd0ccc Implement a do-nothing storage. 2 lat temu
  Jing Yang b80f04d31a Define the storage interface. 2 lat temu

+ 2 - 0
Cargo.toml

@@ -50,4 +50,6 @@ default-members = [
     ".",
     "kvraft",
     "linearizability",
+    "test_configs",
+    "test_utils",
 ]

+ 1 - 1
durio/src/main.rs

@@ -13,9 +13,9 @@ use crate::kv_service::create_async_clerk;
 use crate::run::run_kv_instance;
 
 mod kv_service;
-mod persister;
 mod raft_service;
 mod run;
+mod storage;
 mod utils;
 
 #[derive(Deserialize, Serialize)]

+ 0 - 16
durio/src/persister.rs

@@ -1,16 +0,0 @@
-#[derive(Default)]
-pub struct DoNothingPersister {}
-
-impl ruaft::Persister for DoNothingPersister {
-    fn read_state(&self) -> bytes::Bytes {
-        bytes::Bytes::new()
-    }
-
-    fn save_state(&self, _data: bytes::Bytes) {}
-
-    fn state_size(&self) -> usize {
-        0
-    }
-
-    fn save_snapshot_and_state(&self, _state: bytes::Bytes, _snapshot: &[u8]) {}
-}

+ 3 - 3
durio/src/run.rs

@@ -4,8 +4,8 @@ use std::sync::Arc;
 use kvraft::KVServer;
 
 use crate::kv_service::start_kv_service_server;
-use crate::persister::DoNothingPersister;
 use crate::raft_service::{start_raft_service_server, LazyRaftServiceClient};
+use crate::storage::DoNothingRaftStorage;
 
 pub(crate) async fn run_kv_instance(
     addr: SocketAddr,
@@ -19,9 +19,9 @@ pub(crate) async fn run_kv_instance(
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
     }
 
-    let persister = DoNothingPersister::default();
+    let storage = DoNothingRaftStorage::default();
 
-    let kv_server = KVServer::new(remote_rafts, me, persister, None);
+    let kv_server = KVServer::new(remote_rafts, me, storage);
     let raft = kv_server.raft().clone();
 
     start_raft_service_server(local_raft_peer, raft).await?;

+ 69 - 0
durio/src/storage.rs

@@ -0,0 +1,69 @@
+use ruaft::storage::{
+    RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
+    RaftStorageTrait, RaftStoredState,
+};
+use ruaft::{Index, Term};
+
+#[derive(Default)]
+pub struct DoNothingRaftStorage;
+
+impl RaftStorageTrait for DoNothingRaftStorage {
+    type RaftStoragePersister<LogEntry: RaftLogEntryRef> =
+        DoNothingRaftStoragePersister;
+    type RaftStorageMonitor = DoNothingRaftStorageMonitor;
+
+    fn persister<LogEntry>(
+        self,
+    ) -> std::sync::Arc<DoNothingRaftStoragePersister>
+    where
+        LogEntry: RaftLogEntryRef,
+    {
+        std::sync::Arc::new(DoNothingRaftStoragePersister)
+    }
+
+    fn read_state(&self) -> std::io::Result<RaftStoredState> {
+        Ok(RaftStoredState {
+            current_term: Term(0),
+            voted_for: "".to_string(),
+            log: vec![],
+            snapshot_index: 0,
+            snapshot_term: Term(0),
+            snapshot: vec![],
+        })
+    }
+
+    fn log_compaction_required(&self) -> bool {
+        false
+    }
+
+    fn monitor(&self) -> DoNothingRaftStorageMonitor {
+        DoNothingRaftStorageMonitor
+    }
+}
+
+pub struct DoNothingRaftStorageMonitor;
+
+impl RaftStorageMonitorTrait for DoNothingRaftStorageMonitor {
+    fn should_compact_log_now(&self) -> bool {
+        return false;
+    }
+}
+
+pub struct DoNothingRaftStoragePersister;
+
+impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
+    for DoNothingRaftStoragePersister
+{
+    fn save_term_vote(&self, _term: Term, _voted_for: String) {}
+
+    fn append_one_entry(&self, _entry: &LogEntry) {}
+
+    fn append_entries<'a, LogEntryList>(&self, _entries: LogEntryList)
+    where
+        LogEntry: 'a,
+        LogEntryList: IntoIterator<Item = &'a LogEntry>,
+    {
+    }
+
+    fn update_snapshot(&self, _index: Index, _term: Term, _snapshot: &[u8]) {}
+}

+ 7 - 15
kvraft/src/server.rs

@@ -9,9 +9,9 @@ use futures::FutureExt;
 use parking_lot::Mutex;
 use serde_derive::{Deserialize, Serialize};
 
+use ruaft::storage::RaftStorageTrait;
 use ruaft::{
-    ApplyCommandMessage, Index, Persister, Raft, RemoteRaft, Term,
-    VerifyAuthorityResult,
+    ApplyCommandMessage, Index, Raft, RemoteRaft, Term, VerifyAuthorityResult,
 };
 #[cfg(all(not(test), feature = "integration-test"))]
 use test_utils::thread_local_logger::LocalLogger;
@@ -111,8 +111,7 @@ impl KVServer {
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
-        persister: impl Persister + 'static,
-        max_state_size_bytes: Option<usize>,
+        storage: impl RaftStorageTrait,
     ) -> Arc<Self> {
         let (tx, rx) = channel();
         let apply_command = move |message| {
@@ -123,17 +122,10 @@ impl KVServer {
         let ret = Arc::new(Self {
             me,
             state: Mutex::new(KVServerState::default()),
-            rf: Raft::new(
-                servers,
-                me,
-                persister,
-                apply_command,
-                max_state_size_bytes,
-                {
-                    let snapshot_holder = snapshot_holder.clone();
-                    move |index| snapshot_holder.request_snapshot(index)
-                },
-            ),
+            rf: Raft::new(servers, me, storage, apply_command, {
+                let snapshot_holder = snapshot_holder.clone();
+                move |index| snapshot_holder.request_snapshot(index)
+            }),
             keep_running: AtomicBool::new(true),
         });
         ret.process_command(snapshot_holder, rx);

+ 9 - 8
src/election.rs

@@ -6,12 +6,13 @@ use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
 use crate::remote_context::RemoteContext;
+use crate::storage::SharedLogPersister;
 use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::verify_authority::VerifyAuthorityDaemon;
 use crate::{
-    IndexTerm, Peer, Persister, Raft, RaftState, ReplicableCommand,
-    RequestVoteArgs, State, Term,
+    IndexTerm, Peer, Raft, RaftState, ReplicableCommand, RequestVoteArgs,
+    State, Term,
 };
 
 struct VersionedDeadline {
@@ -82,7 +83,7 @@ struct ElectionCandidate<Command> {
     election: Arc<ElectionState>,
     new_log_entry: SyncLogEntriesComms,
     verify_authority_daemon: VerifyAuthorityDaemon,
-    persister: Arc<dyn Persister>,
+    persister: SharedLogPersister<Command>,
     thread_pool: tokio::runtime::Handle,
 }
 
@@ -95,9 +96,8 @@ enum QuorumOrCancelled {
 
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
-// 1. clone: they are copied to the persister.
-// 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
-// 3. serialize: they are converted to bytes to persist.
+// 1. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
+// 2. serialize: they are converted to bytes to persist.
 impl<Command: ReplicableCommand> Raft<Command> {
     /// Runs the election timer daemon that triggers elections.
     ///
@@ -317,7 +317,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             rf.voted_for = Some(me);
             rf.state = State::Candidate;
 
-            candidate.persister.save_state(rf.persisted_state().into());
+            candidate.persister.save_term_vote(&rf);
 
             (rf.current_term, rf.log.last_index_term().unpack())
         };
@@ -431,7 +431,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
             if rf.commit_index != rf.log.last_index_term().index {
                 // Create a sentinel commit at the start of the term.
                 sentinel_commit_index = rf.log.add_term_change_entry(term);
-                this.persister.save_state(rf.persisted_state().into());
+                this.persister
+                    .append_one_entry(rf.log.at(sentinel_commit_index));
             } else {
                 sentinel_commit_index = rf.commit_index;
             }

+ 1 - 2
src/lib.rs

@@ -5,7 +5,6 @@ pub use crate::apply_command::ApplyCommandMessage;
 pub use crate::index_term::IndexTerm;
 pub use crate::log_array::Index;
 pub use crate::messages::*;
-pub use crate::persister::Persister;
 pub use crate::raft::{Raft, Term};
 pub use crate::remote_raft::RemoteRaft;
 pub use crate::replicable_command::ReplicableCommand;
@@ -26,7 +25,6 @@ mod index_term;
 mod log_array;
 mod messages;
 mod peer_progress;
-mod persister;
 mod process_append_entries;
 mod process_install_snapshot;
 mod process_request_vote;
@@ -37,6 +35,7 @@ mod remote_peer;
 mod remote_raft;
 mod replicable_command;
 mod snapshot;
+pub mod storage;
 mod sync_log_entries;
 mod term_marker;
 pub mod utils;

+ 0 - 63
src/persister.rs

@@ -1,63 +0,0 @@
-use std::convert::TryFrom;
-
-use bytes::Bytes;
-use serde::de::DeserializeOwned;
-use serde::ser::Serialize;
-use serde_derive::{Deserialize, Serialize};
-
-use crate::log_array::LogArray;
-use crate::{Peer, RaftState, Term};
-
-/// An object that saves bytes to permanent storage.
-///
-/// When the methods of this trait returns, data should have been persisted to
-/// the storage. These methods should never return failure except panicking.
-/// They should not block forever, either.
-pub trait Persister: Send + Sync {
-    fn read_state(&self) -> Bytes;
-    fn save_state(&self, bytes: Bytes);
-    fn state_size(&self) -> usize;
-
-    fn save_snapshot_and_state(&self, state: Bytes, snapshot: &[u8]);
-}
-
-#[derive(Serialize, Deserialize)]
-pub(crate) struct PersistedRaftState<Command> {
-    pub current_term: Term,
-    pub voted_for: Option<Peer>,
-    pub log: LogArray<Command>,
-}
-
-impl<Command: Clone, T: AsRef<RaftState<Command>>> From<T>
-    for PersistedRaftState<Command>
-{
-    fn from(raft_state: T) -> Self {
-        Self::from(raft_state.as_ref())
-    }
-}
-
-impl<Command: Clone> From<&RaftState<Command>> for PersistedRaftState<Command> {
-    fn from(raft_state: &RaftState<Command>) -> Self {
-        Self {
-            current_term: raft_state.current_term,
-            voted_for: raft_state.voted_for,
-            log: raft_state.log.clone(),
-        }
-    }
-}
-
-impl<Command: DeserializeOwned> TryFrom<Bytes> for PersistedRaftState<Command> {
-    type Error = bincode::Error;
-
-    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
-        bincode::deserialize(value.as_ref())
-    }
-}
-
-impl<Command: Serialize> From<PersistedRaftState<Command>> for Bytes {
-    fn from(value: PersistedRaftState<Command>) -> Bytes {
-        bincode::serialize(&value)
-            .expect("Serialization should not fail")
-            .into()
-    }
-}

+ 12 - 9
src/process_append_entries.rs

@@ -5,9 +5,8 @@ use crate::{
 };
 
 // Command must be
-// 1. clone: they are copied to the persister.
-// 2. serialize: they are converted to bytes to persist.
-impl<Command: Clone + serde::Serialize> Raft<Command> {
+// 1. serialize: they are converted to bytes to persist.
+impl<Command: serde::Serialize> Raft<Command> {
     pub fn process_append_entries(
         &self,
         args: AppendEntriesArgs<Command>,
@@ -27,7 +26,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
         if rf.current_term < args.term {
             rf.current_term = args.term;
             rf.voted_for = None;
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         rf.state = State::Follower;
@@ -73,7 +72,8 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
 
         // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
         // commit_index < log.end().
-        for (i, entry) in args.entries.iter().enumerate() {
+        let mut first_mismatch = rf.log.end();
+        for (i, entry) in args.entries.into_iter().enumerate() {
             let index = i + args.prev_log_index + 1;
             if rf.log.end() > index {
                 if rf.log.at(index).term != entry.term {
@@ -87,20 +87,23 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
                     // checked that index is strictly larger than commit_index.
                     // The condition that commit_index < log.end() holds.
                     rf.log.truncate(index);
-                    rf.log.push(entry.clone());
+                    rf.log.push(entry);
+                    first_mismatch = std::cmp::min(first_mismatch, index);
                 }
                 // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
                 // The condition that commit_index < log.end() holds.
             } else {
                 // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
                 // that commit_index < log.end() holds.
-                rf.log.push(entry.clone());
+                rf.log.push(entry);
+                first_mismatch = std::cmp::min(first_mismatch, index);
             }
         }
         // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
         // must still hold.
-
-        self.persister.save_state(rf.persisted_state().into());
+        if first_mismatch <= rf.log.end() {
+            self.persister.append_entries(rf.log.after(first_mismatch));
+        }
 
         // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
         // after this if-statement. log.start() <= commit_index still holds.

+ 4 - 6
src/process_install_snapshot.rs

@@ -2,7 +2,7 @@ use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
 use crate::{InstallSnapshotArgs, InstallSnapshotReply, Raft, State};
 
-impl<C: Clone + serde::Serialize> Raft<C> {
+impl<C> Raft<C> {
     pub fn process_install_snapshot(
         &self,
         args: InstallSnapshotArgs,
@@ -25,7 +25,7 @@ impl<C: Clone + serde::Serialize> Raft<C> {
         if rf.current_term < args.term {
             rf.current_term = args.term;
             rf.voted_for = None;
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         rf.state = State::Follower;
@@ -95,10 +95,8 @@ impl<C: Clone + serde::Serialize> Raft<C> {
             );
         }
 
-        self.persister.save_snapshot_and_state(
-            rf.persisted_state().into(),
-            rf.log.snapshot().1,
-        );
+        let (index_term, snapshot) = rf.log.snapshot();
+        self.persister.update_snapshot(index_term, snapshot);
 
         self.apply_command_signal.notify_one();
         InstallSnapshotReply {

+ 3 - 6
src/process_request_vote.rs

@@ -1,9 +1,6 @@
 use crate::{Raft, RequestVoteArgs, RequestVoteReply, State};
 
-// Command must be
-// 1. clone: they are copied to the persister.
-// 2. serialize: they are converted to bytes to persist.
-impl<Command: Clone + serde::Serialize> Raft<Command> {
+impl<Command> Raft<Command> {
     pub fn process_request_vote(
         &self,
         args: RequestVoteArgs,
@@ -38,7 +35,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
             rf.state = State::Follower;
 
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
 
         let voted_for = rf.voted_for;
@@ -54,7 +51,7 @@ impl<Command: Clone + serde::Serialize> Raft<Command> {
             // current term. It does not hurt to update the timer again.
             // We do need to persist, though.
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
 
             RequestVoteReply {
                 term: args.term,

+ 16 - 17
src/raft.rs

@@ -11,14 +11,14 @@ use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::daemon_watch::{Daemon, DaemonWatch};
 use crate::election::ElectionState;
 use crate::heartbeats::{HeartbeatsDaemon, HEARTBEAT_INTERVAL};
-use crate::persister::PersistedRaftState;
 use crate::remote_context::RemoteContext;
 use crate::remote_peer::RemotePeer;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
+use crate::storage::{RaftStorageTrait, SharedLogPersister};
 use crate::sync_log_entries::SyncLogEntriesComms;
 use crate::term_marker::TermMarker;
 use crate::verify_authority::VerifyAuthorityDaemon;
-use crate::{IndexTerm, Persister, RaftState, RemoteRaft, ReplicableCommand};
+use crate::{IndexTerm, RaftState, RemoteRaft, ReplicableCommand};
 
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
@@ -35,7 +35,7 @@ pub struct Raft<Command> {
 
     pub(crate) me: Peer,
 
-    pub(crate) persister: Arc<dyn Persister>,
+    pub(crate) persister: SharedLogPersister<Command>,
 
     pub(crate) sync_log_entries_comms: SyncLogEntriesComms,
     pub(crate) apply_command_signal: Arc<Condvar>,
@@ -60,9 +60,8 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
-        persister: impl Persister + 'static,
+        storage: impl RaftStorageTrait,
         apply_command: impl ApplyCommandFnMut<Command>,
-        max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
     ) -> Self {
         let peer_size = peers.len();
@@ -73,12 +72,11 @@ impl<Command: ReplicableCommand> Raft<Command> {
         // log.start() <= commit_index and commit_index < log.end() both hold.
         assert_eq!(state.commit_index + 1, state.log.end());
 
-        if let Ok(persisted_state) =
-            PersistedRaftState::try_from(persister.read_state())
-        {
-            state.current_term = persisted_state.current_term;
-            state.voted_for = persisted_state.voted_for;
-            state.log = persisted_state.log;
+        if let Ok(stored_state) = storage.read_state() {
+            state.current_term = stored_state.current_term();
+            state.voted_for = stored_state.voted_for();
+            stored_state.restore_log_array(&mut state.log);
+
             state.commit_index = state.log.start();
             // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT: the saved
             // snapshot must have a valid log.start() and log.end(). Thus
@@ -95,7 +93,9 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
 
-        let persister = Arc::new(persister);
+        let storage_monitor =
+            storage.log_compaction_required().then(|| storage.monitor());
+        let persister: SharedLogPersister<Command> = storage.persister();
         let term_marker = TermMarker::create(
             inner_state.clone(),
             election.clone(),
@@ -164,7 +164,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             .create_daemon(Daemon::VerifyAuthority, verify_authority_daemon);
         // Running in a standalone thread.
         let snapshot_daemon =
-            this.run_snapshot_daemon(max_state_size_bytes, request_snapshot);
+            this.run_snapshot_daemon(storage_monitor, request_snapshot);
         daemon_watch.create_daemon(Daemon::Snapshot, snapshot_daemon);
         // Running in a standalone thread.
         let sync_log_entry_daemon =
@@ -216,7 +216,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         }
 
         let index = rf.log.add_command(term, command);
-        self.persister.save_state(rf.persisted_state().into());
+        self.persister.append_one_entry(rf.log.at(index));
 
         self.sync_log_entries_comms.update_followers(index);
 
@@ -284,7 +284,7 @@ impl RaftJoinHandle {
 
 #[cfg(test)]
 mod tests {
-    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::utils::do_nothing::{DoNothingRaftStorage, DoNothingRemoteRaft};
     use crate::ApplyCommandMessage;
 
     use super::*;
@@ -309,9 +309,8 @@ mod tests {
         let raft = Raft::new(
             vec![DoNothingRemoteRaft {}; peer_size],
             me,
-            DoNothingPersister {},
+            DoNothingRaftStorage {},
             |_: ApplyCommandMessage<i32>| {},
-            None,
             |_| {},
         );
 

+ 1 - 9
src/raft_state.rs

@@ -1,6 +1,4 @@
-use crate::{
-    log_array::LogArray, persister::PersistedRaftState, Index, Peer, Term,
-};
+use crate::{log_array::LogArray, Index, Peer, Term};
 
 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
 pub(crate) enum State {
@@ -39,12 +37,6 @@ impl<Command> RaftState<Command> {
     }
 }
 
-impl<Command: Clone> RaftState<Command> {
-    pub fn persisted_state(&self) -> PersistedRaftState<Command> {
-        self.into()
-    }
-}
-
 impl<Command> RaftState<Command> {
     pub fn is_leader(&self) -> bool {
         self.state == State::Leader

+ 8 - 3
src/remote_context.rs

@@ -106,7 +106,9 @@ mod tests {
     use crate::election::ElectionState;
     use crate::remote_peer::RemotePeer;
     use crate::term_marker::TermMarker;
-    use crate::utils::do_nothing::{DoNothingPersister, DoNothingRemoteRaft};
+    use crate::utils::do_nothing::{
+        DoNothingRaftStoragePersister, DoNothingRemoteRaft,
+    };
     use crate::verify_authority::VerifyAuthorityDaemon;
     use crate::{Peer, RaftState};
 
@@ -117,8 +119,11 @@ mod tests {
         let rf = Arc::new(Mutex::new(RaftState::<i32>::create(1, Peer(0))));
         let election = Arc::new(ElectionState::create());
         let verify_authority_daemon = VerifyAuthorityDaemon::create(1);
-        let term_marker =
-            TermMarker::create(rf, election, Arc::new(DoNothingPersister));
+        let term_marker = TermMarker::create(
+            rf,
+            election,
+            Arc::new(DoNothingRaftStoragePersister {}),
+        );
         let remote_peer = RemotePeer::create(
             Peer(0),
             DoNothingRemoteRaft,

+ 10 - 15
src/snapshot.rs

@@ -6,6 +6,7 @@ use parking_lot::{Condvar, Mutex};
 
 use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
+use crate::storage::RaftStorageMonitorTrait;
 use crate::{Index, Raft};
 
 #[derive(Clone, Debug, Default)]
@@ -123,16 +124,11 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
     /// snapshot will be saved in a temporary space. This daemon will wake up,
     /// apply the snapshot into the log and discard log entries before the
     /// snapshot. There is no guarantee that the snapshot will be applied.
-    pub(crate) fn run_snapshot_daemon(
+    pub(crate) fn run_snapshot_daemon<T: RaftStorageMonitorTrait>(
         &mut self,
-        max_state_size: Option<usize>,
+        storage_monitor: Option<T>,
         mut request_snapshot: impl RequestSnapshotFnMut,
     ) -> impl FnOnce() {
-        let max_state_size = match max_state_size {
-            Some(max_state_size) => max_state_size,
-            None => usize::MAX,
-        };
-
         let parker = Parker::new();
         let unparker = parker.unparker().clone();
         self.snapshot_daemon.unparker.replace(unparker.clone());
@@ -144,7 +140,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         let snapshot_daemon = self.snapshot_daemon.clone();
 
         log::info!("{:?} snapshot daemon running ...", me);
-        let snapshot_daemon = move || loop {
+        let snapshot_daemon = move |storage_monitor: T| loop {
             parker.park();
             if !keep_running.load(Ordering::Acquire) {
                 log::info!("{:?} snapshot daemon done.", me);
@@ -152,11 +148,12 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 // Explicitly drop every thing.
                 drop(keep_running);
                 drop(rf);
+                drop(storage_monitor);
                 drop(persister);
                 drop(snapshot_daemon);
                 break;
             }
-            if persister.state_size() >= max_state_size {
+            if storage_monitor.should_compact_log_now() {
                 let log_start = rf.lock().log.first_index_term();
                 let snapshot = {
                     let mut snapshot =
@@ -211,15 +208,13 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 // smaller than commit_index. This is the only place where
                 // log.start() changes.
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
-                persister.save_snapshot_and_state(
-                    rf.persisted_state().into(),
-                    rf.log.snapshot().1,
-                );
+                let (index_term, snapshot) = rf.log.snapshot();
+                persister.update_snapshot(index_term, snapshot);
             }
         };
         move || {
-            if max_state_size != usize::MAX {
-                snapshot_daemon()
+            if let Some(storage_monitor) = storage_monitor {
+                snapshot_daemon(storage_monitor)
             }
         }
     }

+ 32 - 0
src/storage/decode_and_encode.rs

@@ -0,0 +1,32 @@
+use serde::{Deserialize, Serialize};
+
+use crate::log_array::LogEntry;
+use crate::raft::Peer;
+
+pub(crate) fn encode_log_entry<Command: Serialize>(
+    log_entry: &LogEntry<Command>,
+) -> Vec<u8> {
+    bincode::serialize(log_entry).expect("Serialization should not fail")
+}
+
+pub(crate) fn decode_log_entry<'a, Command: Deserialize<'a>>(
+    stored: &'a [u8],
+) -> LogEntry<Command> {
+    bincode::deserialize(&stored).expect("Deserialization should never fail")
+}
+
+pub(crate) fn encode_voted_for(voted_for: &Option<Peer>) -> String {
+    match voted_for {
+        Some(Peer(n)) => n.to_string(),
+        None => "".to_owned(),
+    }
+}
+
+pub(crate) fn decode_voted_for(
+    stored: &str,
+) -> Result<Option<Peer>, std::num::ParseIntError> {
+    if stored.is_empty() {
+        return Ok(None);
+    }
+    stored.parse().map(|v| Some(Peer(v)))
+}

+ 158 - 0
src/storage/internal.rs

@@ -0,0 +1,158 @@
+use std::sync::Arc;
+
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+
+use crate::log_array::{LogArray, LogEntry};
+use crate::raft_state::RaftState;
+use crate::storage::decode_and_encode::{
+    decode_log_entry, decode_voted_for, encode_log_entry, encode_voted_for,
+};
+use crate::storage::{
+    RaftLogEntryRef, RaftStoragePersisterTrait, RaftStoredState,
+};
+use crate::{Index, IndexTerm, Peer, Term};
+
+/// The internal interface of log entry persister. It is similar to
+/// `RaftStoragePersisterTrait`, but with concrete and private types. It
+/// provides better ergonomics to developers of this project.
+///
+/// This trait cannot contain generic methods because it is made into a trait
+/// object in type `SharedLogPersister`. Trait objects are used to remove one
+/// more generic parameter on the overall `Raft` type.
+pub(crate) trait LogPersisterTrait<Command>: Send + Sync {
+    /// Save term and vote from the RaftState.
+    fn save_term_vote(&self, rf: &RaftState<Command>);
+
+    /// Save one log entry. Blocking until the data is persisted.
+    fn append_one_entry(&self, entry: &LogEntry<Command>);
+
+    /// Save may log entries. Blocking until the data is persisted.
+    fn append_entries(&self, entries: &[LogEntry<Command>]);
+
+    /// Save snapshot. Blocking until the data is persisted.
+    fn update_snapshot(&self, index_term: IndexTerm, snapshot: &[u8]);
+}
+
+/// A thin wrapper around `RaftStoragePersisterTrait`.
+impl<T, Command> LogPersisterTrait<Command> for T
+where
+    Command: Serialize,
+    T: RaftStoragePersisterTrait<LogEntry<Command>>,
+{
+    fn save_term_vote(&self, rf: &RaftState<Command>) {
+        <T as RaftStoragePersisterTrait<LogEntry<Command>>>::save_term_vote(
+            self,
+            rf.current_term,
+            encode_voted_for(&rf.voted_for),
+        )
+    }
+
+    fn append_one_entry(&self, entry: &LogEntry<Command>) {
+        <T as RaftStoragePersisterTrait<LogEntry<Command>>>::append_one_entry(
+            self, entry,
+        )
+    }
+
+    fn append_entries(&self, entries: &[LogEntry<Command>]) {
+        <T as RaftStoragePersisterTrait<LogEntry<Command>>>::append_entries(
+            self, entries,
+        )
+    }
+
+    fn update_snapshot(&self, index_term: IndexTerm, snapshot: &[u8]) {
+        <T as RaftStoragePersisterTrait<LogEntry<Command>>>::update_snapshot(
+            self,
+            index_term.index,
+            index_term.term,
+            snapshot,
+        )
+    }
+}
+
+/// The crate-internal interface that is exposed to other parts of this crate.
+pub(crate) type SharedLogPersister<Command> =
+    Arc<dyn LogPersisterTrait<Command>>;
+
+/// Adapter from the internal `LogEntry` type to the public interface.
+impl<Command: Serialize> RaftLogEntryRef for LogEntry<Command> {
+    fn index(&self) -> Index {
+        self.index
+    }
+
+    fn term(&self) -> Term {
+        self.term
+    }
+
+    fn command_bytes(&self) -> Vec<u8> {
+        encode_log_entry(self)
+    }
+}
+
+impl RaftStoredState {
+    pub(crate) fn current_term(&self) -> Term {
+        self.current_term
+    }
+
+    pub(crate) fn voted_for(&self) -> Option<Peer> {
+        decode_voted_for(&self.voted_for)
+            .expect("Persisted log should not contain error")
+    }
+
+    pub(crate) fn restore_log_array<Command: DeserializeOwned>(
+        self,
+        log_array: &mut LogArray<Command>,
+    ) {
+        log_array.reset(self.snapshot_index, self.snapshot_term, self.snapshot);
+        for entry in self.log.iter() {
+            log_array.push(decode_log_entry(&entry.command));
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::log_array::LogArray;
+    use crate::raft::Peer;
+    use crate::storage::decode_and_encode::{
+        encode_log_entry, encode_voted_for,
+    };
+    use crate::storage::{RaftStoredLogEntry, RaftStoredState};
+    use crate::{IndexTerm, Term};
+
+    #[test]
+    fn test_restore_log_array() {
+        let mut log_array = LogArray::create();
+        log_array.add_command(Term(1), 1i32);
+        log_array.add_command(Term(3), 7i32);
+        let stored = RaftStoredState {
+            current_term: Term(8),
+            voted_for: encode_voted_for(&Some(Peer(1))),
+            log: vec![
+                RaftStoredLogEntry {
+                    index: 1,
+                    term: Term(1),
+                    command: encode_log_entry(log_array.at(1)),
+                },
+                RaftStoredLogEntry {
+                    index: 2,
+                    term: Term(3),
+                    command: encode_log_entry(log_array.at(2)),
+                },
+            ],
+            snapshot_index: 0,
+            snapshot_term: Term(1),
+            snapshot: vec![0x09, 0x90],
+        };
+
+        let mut new_log_array: LogArray<i32> = LogArray::create();
+        stored.restore_log_array(&mut new_log_array);
+
+        assert_eq!(log_array.start(), new_log_array.start());
+        assert_eq!(log_array.end(), new_log_array.end());
+        assert_eq!(log_array.at(1).command(), new_log_array.at(1).command());
+        assert_eq!(log_array.at(2).command(), new_log_array.at(2).command());
+        assert_eq!(IndexTerm::pack(0, Term(1)), new_log_array.snapshot().0);
+        assert_eq!(&[0x09u8, 0x90u8], new_log_array.snapshot().1);
+    }
+}

+ 110 - 0
src/storage/mod.rs

@@ -0,0 +1,110 @@
+pub(crate) use internal::SharedLogPersister;
+
+use crate::{Index, Term};
+
+mod decode_and_encode;
+mod internal;
+
+/// A reference type that points to a Raft log entry. Used as input parameters
+/// in the storage interface `RaftStoragePersisterTrait`.
+/// This is to keep the implementation details of Raft log array separated from
+/// the public storage interface.
+pub trait RaftLogEntryRef {
+    fn index(&self) -> Index;
+    fn term(&self) -> Term;
+    fn command_bytes(&self) -> Vec<u8>;
+}
+
+/// An object that writes data to the underlying storage. A typical disk-based
+/// implementation can be implemented as follows:
+/// 1. A file large enough to store a few integers: term, ID of voted for peer,
+/// and a pair of disk offsets of valid log entries.
+/// 2. A list of continuous disk blocks used to store an array of
+/// `RaftStoredLogEntry` bytes.
+/// 3. Another list of continuous disk blocks that stores the application
+/// snapshot.
+///
+/// TODO: Add default index range check implementation to `append_one_entry()`
+/// and `append_entries()`.
+pub trait RaftStoragePersisterTrait<LogEntry: RaftLogEntryRef>:
+    Send + Sync + 'static
+{
+    /// Save the term and vote to storage.
+    fn save_term_vote(&self, term: Term, voted_for: String);
+
+    /// Append one entry to the saved log, overriding the existing entry at the
+    /// same index if it is previously appended. Any existing entries after the
+    /// give index are discarded.
+    fn append_one_entry(&self, entry: &LogEntry);
+
+    /// Append a list of entries to the saved log, overriding existing entries
+    /// at the same indexes if they are previously appended. The indexes
+    /// specified in the input list must be in order and consecutive. Existing
+    /// entries at indexes after the input index range are discarded.
+    fn append_entries<'a, LogEntryList>(&self, entries: LogEntryList)
+    where
+        LogEntry: 'a,
+        LogEntryList: IntoIterator<Item = &'a LogEntry>;
+
+    /// Store the application snapshot. The snapshot is computed from all log
+    /// entries at and before `index`. After the snapshot is saved, any log
+    /// entry on or before index can be discarded.
+    fn update_snapshot(&self, index: Index, term: Term, snapshot: &[u8]);
+}
+
+/// An object that watches the underlying storage system and help Raft decide
+/// if a log compaction, i.e. taking a snapshot, is needed.
+pub trait RaftStorageMonitorTrait: Send + 'static {
+    /// Returns true when the storage system requires a log compaction.
+    fn should_compact_log_now(&self) -> bool;
+}
+
+/// A concrete type that holds one log entry read from the storage.
+#[derive(Clone)]
+pub struct RaftStoredLogEntry {
+    pub index: Index,
+    pub term: Term,
+    pub command: Vec<u8>,
+}
+
+/// A concrete type that holds all information that is needed to restore the
+/// Raft log array and application state right after the instance starts.
+#[derive(Clone)]
+pub struct RaftStoredState {
+    pub current_term: Term,
+    pub voted_for: String,
+    pub log: Vec<RaftStoredLogEntry>,
+    pub snapshot_index: Index,
+    pub snapshot_term: Term,
+    pub snapshot: Vec<u8>,
+}
+
+/// An object that has everything Raft needs from a storage system.
+pub trait RaftStorageTrait {
+    type RaftStoragePersister<LogEntry: RaftLogEntryRef>: RaftStoragePersisterTrait<LogEntry>;
+    type RaftStorageMonitor: RaftStorageMonitorTrait;
+
+    /// Returns a persister that writes data to the underlying storage.
+    ///
+    /// `LogEntry` is not a trait generic parameter, but a method generic parameter,
+    /// because the implementation of this trait must accept any `RaftLogEntryRef`,
+    /// even though it is guaranteed that at runtime only one concrete subtype of
+    /// `RaftLogEntryRef` will be passed to the implementation.
+    fn persister<LogEntry: RaftLogEntryRef>(
+        self,
+    ) -> std::sync::Arc<Self::RaftStoragePersister<LogEntry>>;
+
+    /// Reads out the entire saved state, including term, vote, Raft logs and
+    /// the application snapshot.
+    fn read_state(&self) -> std::io::Result<RaftStoredState>;
+
+    /// Whether or not this storage system requires log compaction. Any
+    /// non-experimental storage must require log compaction.
+    fn log_compaction_required(&self) -> bool {
+        return true;
+    }
+
+    /// Returns a monitor that helps Raft decide when a compaction should happen
+    /// during the lifetime of the application.
+    fn monitor(&self) -> Self::RaftStorageMonitor;
+}

+ 6 - 6
src/term_marker.rs

@@ -1,25 +1,25 @@
 use std::sync::Arc;
 
 use parking_lot::Mutex;
-use serde::Serialize;
 
 use crate::election::ElectionState;
-use crate::{Persister, RaftState, State, Term};
+use crate::storage::SharedLogPersister;
+use crate::{RaftState, State, Term};
 
 /// A closure that updates the `Term` of the `RaftState`.
 #[derive(Clone)]
 pub(crate) struct TermMarker<Command> {
     rf: Arc<Mutex<RaftState<Command>>>,
     election: Arc<ElectionState>,
-    persister: Arc<dyn Persister>,
+    persister: SharedLogPersister<Command>,
 }
 
-impl<Command: Clone + Serialize> TermMarker<Command> {
+impl<Command> TermMarker<Command> {
     /// Create a `TermMarker` that can be passed to async tasks.
     pub fn create(
         rf: Arc<Mutex<RaftState<Command>>>,
         election: Arc<ElectionState>,
-        persister: Arc<dyn Persister>,
+        persister: SharedLogPersister<Command>,
     ) -> Self {
         Self {
             rf,
@@ -36,7 +36,7 @@ impl<Command: Clone + Serialize> TermMarker<Command> {
             rf.state = State::Follower;
 
             self.election.reset_election_timer();
-            self.persister.save_state(rf.persisted_state().into());
+            self.persister.save_term_vote(&rf);
         }
     }
 }

+ 69 - 20
src/utils/do_nothing.rs

@@ -1,30 +1,16 @@
 #![cfg(feature = "integration-test")]
 
 use async_trait::async_trait;
-use bytes::Bytes;
 
+use crate::storage::{
+    RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
+    RaftStorageTrait, RaftStoredState,
+};
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
-    InstallSnapshotReply, Persister, RemoteRaft, RequestVoteArgs,
-    RequestVoteReply,
+    AppendEntriesArgs, AppendEntriesReply, Index, InstallSnapshotArgs,
+    InstallSnapshotReply, RemoteRaft, RequestVoteArgs, RequestVoteReply, Term,
 };
 
-#[derive(Clone)]
-pub struct DoNothingPersister;
-impl Persister for DoNothingPersister {
-    fn read_state(&self) -> Bytes {
-        Bytes::new()
-    }
-
-    fn save_state(&self, _bytes: Bytes) {}
-
-    fn state_size(&self) -> usize {
-        0
-    }
-
-    fn save_snapshot_and_state(&self, _: Bytes, _: &[u8]) {}
-}
-
 #[derive(Clone)]
 pub struct DoNothingRemoteRaft;
 #[async_trait]
@@ -50,3 +36,66 @@ impl<Command: 'static + Send> RemoteRaft<Command> for DoNothingRemoteRaft {
         unimplemented!()
     }
 }
+
+pub struct DoNothingRaftStorage;
+
+impl RaftStorageTrait for DoNothingRaftStorage {
+    type RaftStoragePersister<LogEntry: RaftLogEntryRef> =
+        DoNothingRaftStoragePersister;
+    type RaftStorageMonitor = DoNothingRaftStorageMonitor;
+
+    fn persister<LogEntry>(
+        self,
+    ) -> std::sync::Arc<DoNothingRaftStoragePersister>
+    where
+        LogEntry: RaftLogEntryRef,
+    {
+        std::sync::Arc::new(DoNothingRaftStoragePersister)
+    }
+
+    fn read_state(&self) -> std::io::Result<RaftStoredState> {
+        Ok(RaftStoredState {
+            current_term: Term(0),
+            voted_for: "".to_string(),
+            log: vec![],
+            snapshot_index: 0,
+            snapshot_term: Term(0),
+            snapshot: vec![],
+        })
+    }
+
+    fn log_compaction_required(&self) -> bool {
+        false
+    }
+
+    fn monitor(&self) -> DoNothingRaftStorageMonitor {
+        DoNothingRaftStorageMonitor
+    }
+}
+
+pub struct DoNothingRaftStorageMonitor;
+
+impl RaftStorageMonitorTrait for DoNothingRaftStorageMonitor {
+    fn should_compact_log_now(&self) -> bool {
+        return false;
+    }
+}
+
+pub struct DoNothingRaftStoragePersister;
+
+impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
+    for DoNothingRaftStoragePersister
+{
+    fn save_term_vote(&self, _term: Term, _voted_for: String) {}
+
+    fn append_one_entry(&self, _entry: &LogEntry) {}
+
+    fn append_entries<'a, LogEntryList>(&self, _entries: LogEntryList)
+    where
+        LogEntry: 'a,
+        LogEntryList: IntoIterator<Item = &'a LogEntry>,
+    {
+    }
+
+    fn update_snapshot(&self, _index: Index, _term: Term, _snapshot: &[u8]) {}
+}

+ 2 - 9
src/utils/integration_test.rs

@@ -1,10 +1,9 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
-    RequestVoteArgs, RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
+    RequestVoteReply, Term,
 };
-use std::sync::Arc;
 
 pub fn make_request_vote_args(
     term: Term,
@@ -51,9 +50,3 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }
-
-impl<Command> Raft<Command> {
-    pub fn persister(&self) -> Arc<dyn Persister> {
-        self.persister.clone()
-    }
-}

+ 597 - 0
test_configs/src/in_memory_storage.rs

@@ -0,0 +1,597 @@
+/// A in-memory simulation of storage operations.
+use std::collections::VecDeque;
+use std::mem::size_of;
+use std::sync::Arc;
+
+use parking_lot::Mutex;
+
+use ruaft::storage::{
+    RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
+    RaftStorageTrait, RaftStoredLogEntry, RaftStoredState,
+};
+use ruaft::{Index, Term};
+
+#[derive(Clone)]
+pub struct State {
+    current_term: Term,
+    voted_for: String,
+    log: VecDeque<RaftStoredLogEntry>,
+
+    snapshot_index: Index,
+    snapshot_term: Term,
+    snapshot: Vec<u8>,
+
+    log_size: usize,
+}
+
+impl State {
+    /// Create an empty saved instance.
+    fn create() -> Self {
+        Self {
+            current_term: Term(0),
+            voted_for: "".to_owned(),
+            log: VecDeque::new(),
+            snapshot_index: 0,
+            snapshot_term: Term(0),
+            snapshot: vec![],
+            log_size: 0,
+        }
+    }
+
+    /// Append entry and update internal disk usage accounting.
+    fn append_entry(&mut self, entry: RaftStoredLogEntry) {
+        self.log_size += size_of::<RaftStoredLogEntry>();
+        self.log_size += entry.command.len();
+
+        self.log.push_back(entry);
+    }
+
+    /// Returns the total disk usage of stored data. Each scala type must be
+    /// accounted here individually.
+    fn total_size(&self) -> usize {
+        self.log_size + self.voted_for.len() + size_of::<Self>()
+    }
+}
+
+/// The shared data that should be put on disk.
+pub struct InMemoryState(Mutex<State>);
+
+/// The storage interface.
+#[derive(Clone)]
+pub struct InMemoryStorage {
+    locked_state: Arc<InMemoryState>,
+    max_state_bytes: usize,
+}
+
+impl RaftStorageTrait for InMemoryStorage {
+    type RaftStoragePersister<LogEntry: RaftLogEntryRef> = InMemoryState;
+    type RaftStorageMonitor = InMemoryStorageMonitor;
+
+    fn persister<LogEntry>(self) -> Arc<Self::RaftStoragePersister<LogEntry>>
+    where
+        LogEntry: RaftLogEntryRef,
+    {
+        self.locked_state
+    }
+
+    fn read_state(&self) -> std::io::Result<RaftStoredState> {
+        let stored = self.locked_state.0.lock();
+        let snapshot_index = stored.snapshot_index;
+
+        let mut organized_log = vec![];
+        for op in &stored.log {
+            if op.index <= snapshot_index {
+                // Discard all entries that are before snapshot index.
+                continue;
+            }
+
+            while organized_log
+                .last()
+                .map(|entry: &RaftStoredLogEntry| entry.index >= op.index)
+                .unwrap_or(false)
+            {
+                organized_log.pop();
+            }
+            organized_log.push(RaftStoredLogEntry {
+                index: op.index,
+                term: op.term,
+                command: op.command.clone(),
+            });
+        }
+
+        Ok(RaftStoredState {
+            current_term: stored.current_term,
+            voted_for: stored.voted_for.clone(),
+            log: organized_log,
+            snapshot_index: stored.snapshot_index,
+            snapshot_term: stored.snapshot_term,
+            snapshot: stored.snapshot.clone(),
+        })
+    }
+
+    fn monitor(&self) -> Self::RaftStorageMonitor {
+        InMemoryStorageMonitor {
+            stored: self.locked_state.clone(),
+            max_state_bytes: self.max_state_bytes,
+        }
+    }
+}
+
+/// The storage monitor interface and controls compaction.
+pub struct InMemoryStorageMonitor {
+    stored: Arc<InMemoryState>,
+    max_state_bytes: usize,
+}
+
+impl RaftStorageMonitorTrait for InMemoryStorageMonitor {
+    fn should_compact_log_now(&self) -> bool {
+        let stored = self.stored.0.lock();
+        let total_size = stored.total_size();
+        return total_size > self.max_state_bytes;
+    }
+}
+
+/// The persister interface that implements the logic.
+impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
+    for InMemoryState
+{
+    fn save_term_vote(&self, term: Term, voted_for: String) {
+        let mut stored = self.0.lock();
+        stored.current_term = term;
+        stored.voted_for = voted_for;
+    }
+
+    fn append_one_entry(&self, entry: &LogEntry) {
+        let mut stored = self.0.lock();
+        stored.append_entry(RaftStoredLogEntry {
+            index: entry.index(),
+            term: entry.term(),
+            command: entry.command_bytes(),
+        });
+    }
+
+    fn append_entries<'a, LogEntryList>(&self, entries: LogEntryList)
+    where
+        LogEntry: 'a,
+        LogEntryList: IntoIterator<Item = &'a LogEntry>,
+    {
+        let mut stored = self.0.lock();
+        for entry in entries {
+            stored.append_entry(RaftStoredLogEntry {
+                index: entry.index(),
+                term: entry.term(),
+                command: entry.command_bytes(),
+            })
+        }
+    }
+
+    fn update_snapshot(&self, index: Index, term: Term, snapshot: &[u8]) {
+        let mut stored = self.0.lock();
+        stored.snapshot_index = index;
+        stored.snapshot_term = term;
+        stored.snapshot = snapshot.to_vec();
+        while stored
+            .log
+            .front()
+            .map(|e| e.index <= index)
+            .unwrap_or(false)
+        {
+            let entry =
+                stored.log.pop_front().expect("Popping must be successful");
+            stored.log_size -= size_of::<RaftStoredLogEntry>();
+            stored.log_size -= entry.command.len();
+        }
+    }
+}
+
+impl InMemoryStorage {
+    /// Create a new storage with bytes limit.
+    pub fn create(max_state_bytes: usize) -> Self {
+        Self {
+            locked_state: Arc::new(InMemoryState(Mutex::new(State::create()))),
+            max_state_bytes,
+        }
+    }
+
+    /// Save the entire in-memory state.
+    pub fn save(&self) -> State {
+        self.locked_state.0.lock().clone()
+    }
+
+    /// Restore the entire in-memory state, not including `max_state_bytes`.
+    pub fn restore(&self, state: State) {
+        *self.locked_state.0.lock() = state;
+    }
+
+    /// Returns the total bytes cost, not including snapshot.
+    pub fn state_size(&self) -> usize {
+        self.locked_state.0.lock().total_size()
+    }
+
+    /// Returns the bytes cost of the snapshot.
+    pub fn snapshot_size(&self) -> usize {
+        self.locked_state.0.lock().snapshot.len()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::VecDeque;
+    use std::mem::size_of;
+    use std::ops::Deref;
+
+    use parking_lot::Mutex;
+
+    use ruaft::storage::{
+        RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
+        RaftStorageTrait,
+    };
+    use ruaft::{Index, Term};
+
+    use crate::in_memory_storage::{InMemoryState, State};
+    use crate::InMemoryStorage;
+
+    struct Transaction {
+        index: Index,
+        amount: f64,
+        description: String,
+    }
+
+    impl Transaction {
+        fn populate(index: Index) -> Self {
+            Self {
+                index,
+                amount: index as f64 * 7.0,
+                description: char::from('a' as u8 + index as u8).to_string(),
+            }
+        }
+    }
+
+    impl RaftLogEntryRef for Transaction {
+        fn index(&self) -> Index {
+            self.index
+        }
+
+        fn term(&self) -> Term {
+            Term(self.index / 2)
+        }
+
+        fn command_bytes(&self) -> Vec<u8> {
+            let mut bytes = vec![];
+            bytes.extend(self.index.to_be_bytes());
+            bytes.extend(self.amount.to_be_bytes());
+            bytes.extend(self.description.bytes());
+
+            bytes
+        }
+    }
+
+    fn type_hint(
+        val: &InMemoryState,
+    ) -> &impl RaftStoragePersisterTrait<Transaction> {
+        val
+    }
+
+    #[test]
+    fn test_append() {
+        let state = InMemoryState(Mutex::new(State::create()));
+        state.append_one_entry(&Transaction {
+            index: 0,
+            amount: 0.0,
+            description: "a".to_owned(),
+        });
+
+        state.append_entries(&[
+            Transaction {
+                index: 1,
+                amount: 1.0,
+                description: "test".to_owned(),
+            },
+            Transaction {
+                index: 2,
+                amount: -1.0,
+                description: "another".to_owned(),
+            },
+            Transaction {
+                index: 3,
+                amount: 1.0,
+                description: "test".to_owned(),
+            },
+        ]);
+
+        state.append_one_entry(&Transaction {
+            index: 1,
+            amount: 2.0,
+            description: "".to_owned(),
+        });
+
+        let state = state.0.lock();
+        assert_eq!(0, state.current_term.0);
+        assert!(state.voted_for.is_empty());
+        assert_eq!(0, state.snapshot_index);
+
+        assert!(state.snapshot.is_empty());
+        assert_eq!(296, state.log_size);
+
+        // log
+        assert_eq!(5, state.log.len());
+
+        // log[0]
+        assert_eq!(0, state.log[0].index);
+        assert_eq!(Term(0), state.log[0].term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // index
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x61, // "a"
+            ],
+            state.log[0].command
+        );
+
+        // log[1]
+        let entry = &state.log[1];
+        assert_eq!(1, entry.index);
+        assert_eq!(Term(0), entry.term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
+                0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x74, 0x65, 0x73, 0x74, // "test"
+            ],
+            entry.command
+        );
+
+        // log[2]
+        let entry = &state.log[2];
+        assert_eq!(2, entry.index);
+        assert_eq!(Term(1), entry.term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
+                0xBF, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x61, 0x6E, 0x6F, 0x74, 0x68, 0x65, 0x72 // "another"
+            ],
+            entry.command
+        );
+
+        // log[3]
+        let entry = &state.log[3];
+        assert_eq!(3, entry.index);
+        assert_eq!(Term(1), entry.term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
+                0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x74, 0x65, 0x73, 0x74, // "test"
+            ],
+            entry.command
+        );
+
+        // log[4]
+        let entry = &state.log[4];
+        assert_eq!(1, entry.index);
+        assert_eq!(Term(0), entry.term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
+                0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+            ],
+            entry.command
+        );
+    }
+
+    #[test]
+    fn test_save_term_vote() {
+        let state = InMemoryState(Mutex::new(State::create()));
+        {
+            let state = state.0.lock();
+            assert_eq!(Term(0), state.current_term);
+            assert!(state.voted_for.is_empty());
+        }
+        type_hint(&state).save_term_vote(Term(9), "hi".to_owned());
+
+        let state = state.0.lock();
+        assert_eq!(Term(9), state.current_term);
+        assert_eq!("hi", &state.voted_for);
+    }
+
+    #[test]
+    fn test_update_snapshot() {
+        let state = InMemoryState(Mutex::new(State::create()));
+        {
+            let state = state.0.lock();
+            assert_eq!(0, state.snapshot_index);
+            assert!(state.snapshot.is_empty());
+        }
+
+        state.append_entries(&[
+            Transaction::populate(0),
+            Transaction::populate(1),
+            Transaction::populate(7),
+            Transaction::populate(8),
+        ]);
+        type_hint(&state).update_snapshot(7, Term(3), &[0x01, 0x02]);
+
+        let state = state.0.lock();
+        assert_eq!(7, state.snapshot_index);
+        assert_eq!(Term(3), state.snapshot_term);
+        assert_eq!(&[0x01, 0x02], state.snapshot.as_slice());
+        // The first 3 entries are removed eagerly.
+        assert_eq!(1, state.log.len());
+    }
+
+    #[test]
+    fn test_read_state() {
+        let storage = InMemoryStorage::create(0);
+        let state = storage.clone().persister::<Transaction>();
+        state.append_entries(&[
+            Transaction::populate(0),
+            Transaction::populate(1),
+            Transaction::populate(2),
+            Transaction::populate(3),
+            Transaction {
+                index: 2,
+                amount: 1.0,
+                description: "hi".to_owned(),
+            },
+            Transaction::populate(4),
+            Transaction::populate(5),
+            Transaction::populate(5),
+            Transaction::populate(5),
+            Transaction::populate(6),
+            Transaction {
+                index: 3,
+                amount: 1.0,
+                description: "hi".to_owned(),
+            },
+            Transaction::populate(7),
+            Transaction::populate(7),
+            Transaction::populate(7),
+        ]);
+        type_hint(&state).save_term_vote(Term(7), "voted_for".to_owned());
+        type_hint(&state).update_snapshot(1, Term(0), &[0x99]);
+
+        let raft_stored_state = storage
+            .read_state()
+            .expect("Read in-memory state should never fail");
+        assert_eq!(Term(7), raft_stored_state.current_term);
+        assert_eq!("voted_for", &raft_stored_state.voted_for);
+        assert_eq!(3, raft_stored_state.log.len());
+        assert_eq!(&[0x99], raft_stored_state.snapshot.as_slice());
+        assert_eq!(1, raft_stored_state.snapshot_index);
+        assert_eq!(Term(0), raft_stored_state.snapshot_term);
+
+        let entry = &raft_stored_state.log[0];
+        assert_eq!(2, entry.index);
+        assert_eq!(Term(1), entry.term);
+        assert_eq!(
+            &[
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
+                0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x68, 0x69, // "hi"
+            ],
+            entry.command.as_slice()
+        );
+
+        let entry = &raft_stored_state.log[1];
+        assert_eq!(3, entry.index);
+        assert_eq!(Term(1), entry.term);
+        assert_eq!(
+            &[
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
+                0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x68, 0x69, // "hi"
+            ],
+            entry.command.as_slice()
+        );
+
+        let entry = &raft_stored_state.log[2];
+        assert_eq!(7, entry.index);
+        assert_eq!(Term(3), entry.term);
+        assert_eq!(
+            &[
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // index
+                0x40, 0x48, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x68, // "h"
+            ],
+            entry.command.as_slice()
+        );
+
+        assert_eq!(807, state.0.lock().total_size());
+    }
+
+    #[test]
+    fn test_save_restore() {
+        let storage = InMemoryStorage::create(0);
+        let state = storage.clone().persister::<Transaction>();
+        state.append_one_entry(&Transaction {
+            index: 9,
+            amount: 1.0,
+            description: "hello".to_owned(),
+        });
+
+        let saved = storage.save();
+
+        let another_storage = InMemoryStorage::create(100);
+        another_storage.restore(saved);
+
+        assert_eq!(100, another_storage.max_state_bytes);
+        let another_state = another_storage.locked_state.0.lock();
+        let entry = &another_state.log[0];
+        assert_eq!(9, entry.index);
+        assert_eq!(Term(4), entry.term);
+        assert_eq!(
+            vec![
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, // index
+                0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
+                0x68, 0x65, 0x6C, 0x6C, 0x6F, // "hello"
+            ],
+            entry.command
+        );
+    }
+
+    #[test]
+    fn test_total_size() {
+        let state = State::create();
+        assert_eq!(8, size_of::<Term>());
+        assert_eq!(8, size_of::<Index>());
+        assert_eq!(8, size_of::<usize>());
+        assert_eq!(24, size_of::<String>());
+        assert_eq!(24, size_of::<Vec<u8>>());
+        assert_eq!(32, size_of::<VecDeque<u8>>());
+
+        // 112 = 8 + 24 + 32 + 8 + 8 + 24 + 8
+        let empty_size = 112;
+        assert_eq!(empty_size, state.total_size());
+
+        let state = InMemoryState(Mutex::new(State::create()));
+        // command_size = 8 + 8 + 5 = 21
+        // log_size = 8 + 8 + 24 (vec) + command_size = 61
+        state.append_one_entry(&Transaction {
+            index: 9,
+            amount: 1.0,
+            description: "hello".to_owned(),
+        });
+        assert_eq!(61, state.0.lock().log_size);
+        assert_eq!(empty_size + 61, state.0.lock().total_size());
+
+        // total_size() is verified in other tests with complex setup.
+    }
+
+    #[test]
+    fn test_monitor() {
+        let storage = InMemoryStorage::create(150);
+        let state = storage.clone().persister::<Transaction>();
+        let monitor = storage.monitor();
+        assert_eq!(150, monitor.max_state_bytes);
+        assert!(!monitor.should_compact_log_now());
+
+        state.append_one_entry(&Transaction {
+            index: 9,
+            amount: 1.0,
+            description: "hello".to_owned(),
+        });
+        assert_eq!(173, storage.state_size());
+        assert!(monitor.should_compact_log_now());
+
+        let bigger_storage = InMemoryStorage::create(180);
+        bigger_storage.restore(storage.save());
+        assert_eq!(173, bigger_storage.state_size());
+        let bigger_monitor = bigger_storage.monitor();
+        assert!(!bigger_monitor.should_compact_log_now());
+    }
+
+    #[test]
+    fn test_snapshot_size() {
+        let storage = InMemoryStorage::create(0);
+        let state = storage.clone().persister::<Transaction>();
+        {
+            let state = state.0.lock();
+            assert_eq!(0, state.snapshot_index);
+            assert!(state.snapshot.is_empty());
+        }
+
+        type_hint(state.deref()).update_snapshot(7, Term(3), &[0x01, 0x02]);
+        assert_eq!(2, storage.snapshot_size());
+    }
+}

+ 3 - 4
test_configs/src/interceptor/mod.rs

@@ -16,7 +16,7 @@ use ruaft::{
     RequestVoteReply,
 };
 
-use crate::Persister;
+use crate::InMemoryStorage;
 
 type RaftId = usize;
 
@@ -276,9 +276,8 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
-        let persister = Persister::new();
-        let kv_server =
-            KVServer::new(client_vec.to_vec(), index, persister, max_state);
+        let storage = InMemoryStorage::create(max_state.unwrap_or(usize::MAX));
+        let kv_server = KVServer::new(client_vec.to_vec(), index, storage);
         kv_servers.push(kv_server);
     }
 

+ 14 - 23
test_configs/src/kvraft/config.rs

@@ -8,7 +8,7 @@ use rand::thread_rng;
 use kvraft::Clerk;
 use kvraft::KVServer;
 
-use crate::{register_kv_server, register_server, Persister, RpcClient};
+use crate::{register_kv_server, register_server, InMemoryStorage, RpcClient};
 
 struct ConfigState {
     kv_servers: Vec<Option<Arc<KVServer>>>,
@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Option<Persister>>>,
+    storage: Mutex<Vec<InMemoryStorage>>,
     maxraftstate: usize,
 }
 
@@ -52,12 +52,9 @@ impl Config {
             }
         }
 
-        let persister = self.storage.lock()[index]
-            .take()
-            .expect("A persister must be present to create a raft server");
+        let storage = self.storage.lock()[index].clone();
 
-        let kv =
-            KVServer::new(clients, index, persister, Some(self.maxraftstate));
+        let kv = KVServer::new(clients, index, storage);
         self.state.lock().kv_servers[index].replace(kv.clone());
 
         let raft = kv.raft().clone();
@@ -153,13 +150,12 @@ impl Config {
         }
 
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
-            let persister = kv_server.raft().persister();
-            let data = Persister::downcast_unsafe(persister.as_ref()).read();
+            let data = self.storage.lock()[index].save();
             kv_server.kill();
 
-            let persister = Persister::new();
-            persister.restore(data);
-            self.storage.lock()[index] = Some(persister);
+            let storage = InMemoryStorage::create(self.maxraftstate);
+            storage.restore(data);
+            self.storage.lock()[index] = storage;
         }
     }
 
@@ -253,16 +249,11 @@ impl Config {
     fn check_size(
         &self,
         upper: usize,
-        size_fn: impl Fn(&Persister) -> usize,
+        size_fn: impl Fn(&InMemoryStorage) -> usize,
     ) -> Result<(), String> {
         let mut over_limits = String::new();
-        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
-            let p = p
-                .as_ref()
-                .expect("KV server must be running to check size")
-                .raft()
-                .persister();
-            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
+        for (index, storage) in self.storage.lock().iter().enumerate() {
+            let size = size_fn(storage);
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
@@ -278,11 +269,11 @@ impl Config {
     }
 
     pub fn check_log_size(&self, upper: usize) -> Result<(), String> {
-        self.check_size(upper, ruaft::Persister::state_size)
+        self.check_size(upper, InMemoryStorage::state_size)
     }
 
     pub fn check_snapshot_size(&self, upper: usize) -> Result<(), String> {
-        self.check_size(upper, Persister::snapshot_size)
+        self.check_size(upper, InMemoryStorage::snapshot_size)
     }
 }
 
@@ -306,7 +297,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     storage
         .lock()
-        .resize_with(server_count, || Some(Persister::new()));
+        .resize_with(server_count, || InMemoryStorage::create(maxraftstate));
 
     let cfg = Config {
         network,

+ 2 - 2
test_configs/src/lib.rs

@@ -1,11 +1,11 @@
 #![allow(clippy::uninlined_format_args)]
 
+mod in_memory_storage;
 pub mod interceptor;
 pub mod kvraft;
-mod persister;
 pub mod raft;
 mod rpcs;
 pub mod utils;
 
-pub use persister::Persister;
+pub use in_memory_storage::InMemoryStorage;
 use rpcs::{register_kv_server, register_server, RpcClient};

+ 0 - 66
test_configs/src/persister.rs

@@ -1,66 +0,0 @@
-use parking_lot::Mutex;
-
-#[derive(Clone)]
-pub struct State {
-    pub bytes: bytes::Bytes,
-    pub snapshot: Vec<u8>,
-}
-
-pub struct Persister {
-    state: Mutex<State>,
-}
-
-impl Persister {
-    pub fn new() -> Self {
-        Self {
-            state: Mutex::new(State {
-                bytes: bytes::Bytes::new(),
-                snapshot: vec![],
-            }),
-        }
-    }
-}
-
-impl Default for Persister {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl ruaft::Persister for Persister {
-    fn read_state(&self) -> bytes::Bytes {
-        self.state.lock().bytes.clone()
-    }
-
-    fn save_state(&self, data: bytes::Bytes) {
-        self.state.lock().bytes = data;
-    }
-
-    fn state_size(&self) -> usize {
-        self.state.lock().bytes.len()
-    }
-
-    fn save_snapshot_and_state(&self, state: bytes::Bytes, snapshot: &[u8]) {
-        let mut this = self.state.lock();
-        this.bytes = state;
-        this.snapshot = snapshot.to_vec();
-    }
-}
-
-impl Persister {
-    pub fn read(&self) -> State {
-        self.state.lock().clone()
-    }
-
-    pub fn restore(&self, state: State) {
-        *self.state.lock() = state;
-    }
-
-    pub fn snapshot_size(&self) -> usize {
-        self.state.lock().snapshot.len()
-    }
-
-    pub fn downcast_unsafe(trait_obj: &dyn ruaft::Persister) -> &Self {
-        unsafe { &*(trait_obj as *const dyn ruaft::Persister as *const Self) }
-    }
-}

+ 10 - 12
test_configs/src/raft/config.rs

@@ -8,10 +8,11 @@ use anyhow::{anyhow, bail};
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
-use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
+use ruaft::{ApplyCommandMessage, Raft, Term};
 
 use crate::register_server;
 use crate::utils::{sleep_millis, NO_SNAPSHOT};
+use crate::InMemoryStorage;
 
 struct ConfigState {
     rafts: Vec<Option<Raft<i32>>>,
@@ -22,7 +23,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     max_index: usize,
-    saved: Vec<Option<crate::Persister>>,
+    saved: Vec<InMemoryStorage>,
 }
 
 pub struct Config {
@@ -303,13 +304,13 @@ impl Config {
         // the apply command function.
         let Some(raft) = raft else { return };
 
-        let data = raft.persister().read_state();
+        let data = self.log.lock().saved[index].save();
         raft.kill().join();
 
         let mut log = self.log.lock();
-        let persister = crate::Persister::new();
-        persister.save_state(data);
-        log.saved[index] = Some(persister);
+        let storage = InMemoryStorage::create(usize::MAX);
+        storage.restore(data);
+        log.saved[index] = storage;
     }
 
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -327,17 +328,14 @@ impl Config {
                 )))
             }
         }
-        let persister = self.log.lock().saved[index]
-            .take()
-            .expect("A persister must be present to create a raft server");
+        let storage = self.log.lock().saved[index].clone();
 
         let log = self.log.clone();
         let raft = Raft::new(
             clients,
             index,
-            persister,
+            storage,
             move |message| Self::apply_command(log.clone(), index, message),
-            None,
             NO_SNAPSHOT,
         );
         self.state.lock().rafts[index].replace(raft.clone());
@@ -493,7 +491,7 @@ pub fn make_config(
     });
 
     let mut saved = vec![];
-    saved.resize_with(server_count, || Some(crate::Persister::new()));
+    saved.resize_with(server_count, || InMemoryStorage::create(usize::MAX));
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         results: vec![],

+ 2 - 3
test_configs/src/rpcs.rs

@@ -216,7 +216,7 @@ pub fn register_kv_server<
 
 #[cfg(test)]
 mod tests {
-    use ruaft::utils::do_nothing::DoNothingPersister;
+    use ruaft::utils::do_nothing::DoNothingRaftStorage;
     use ruaft::utils::integration_test::{
         make_append_entries_args, make_request_vote_args,
         unpack_append_entries_reply, unpack_request_vote_reply,
@@ -240,9 +240,8 @@ mod tests {
             let raft = Raft::new(
                 vec![RpcClient(client)],
                 0,
-                DoNothingPersister,
+                DoNothingRaftStorage,
                 |_: ApplyCommandMessage<i32>| {},
-                None,
                 crate::utils::NO_SNAPSHOT,
             );
             register_server(raft, name, network.as_ref())?;