| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- /// A in-memory simulation of storage operations.
- use std::collections::VecDeque;
- use std::mem::size_of;
- use std::sync::Arc;
- use parking_lot::Mutex;
- use ruaft::storage::{
- RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
- RaftStorageTrait, RaftStoredLogEntry, RaftStoredState,
- };
- use ruaft::{Index, Term};
- #[derive(Clone)]
- pub struct State {
- current_term: Term,
- voted_for: String,
- log: VecDeque<RaftStoredLogEntry>,
- snapshot_index: Index,
- snapshot_term: Term,
- snapshot: Vec<u8>,
- log_size: usize,
- }
- impl State {
- /// Create an empty saved instance.
- fn create() -> Self {
- Self {
- current_term: Term(0),
- voted_for: "".to_owned(),
- log: VecDeque::new(),
- snapshot_index: 0,
- snapshot_term: Term(0),
- snapshot: vec![],
- log_size: 0,
- }
- }
- /// Append entry and update internal disk usage accounting.
- fn append_entry(&mut self, entry: RaftStoredLogEntry) {
- self.log_size += size_of::<RaftStoredLogEntry>();
- self.log_size += entry.command.len();
- self.log.push_back(entry);
- }
- /// Returns the total disk usage of stored data. Each scala type must be
- /// accounted here individually.
- fn total_size(&self) -> usize {
- self.log_size + self.voted_for.len() + size_of::<Self>()
- }
- }
- /// The shared data that should be put on disk.
- pub struct InMemoryState(Mutex<State>);
- /// The storage interface.
- #[derive(Clone)]
- pub struct InMemoryStorage {
- locked_state: Arc<InMemoryState>,
- max_state_bytes: usize,
- }
- impl RaftStorageTrait for InMemoryStorage {
- type RaftStoragePersister<LogEntry: RaftLogEntryRef> = InMemoryState;
- type RaftStorageMonitor = InMemoryStorageMonitor;
- fn persister<LogEntry>(self) -> Arc<Self::RaftStoragePersister<LogEntry>>
- where
- LogEntry: RaftLogEntryRef,
- {
- self.locked_state
- }
- fn read_state(&self) -> std::io::Result<RaftStoredState> {
- let stored = self.locked_state.0.lock();
- let snapshot_index = stored.snapshot_index;
- let mut organized_log = vec![];
- for op in &stored.log {
- if op.index <= snapshot_index {
- // Discard all entries that are before snapshot index.
- continue;
- }
- while organized_log
- .last()
- .map(|entry: &RaftStoredLogEntry| entry.index >= op.index)
- .unwrap_or(false)
- {
- organized_log.pop();
- }
- organized_log.push(RaftStoredLogEntry {
- index: op.index,
- term: op.term,
- command: op.command.clone(),
- });
- }
- Ok(RaftStoredState {
- current_term: stored.current_term,
- voted_for: stored.voted_for.clone(),
- log: organized_log,
- snapshot_index: stored.snapshot_index,
- snapshot_term: stored.snapshot_term,
- snapshot: stored.snapshot.clone(),
- })
- }
- fn monitor(&self) -> Self::RaftStorageMonitor {
- InMemoryStorageMonitor {
- stored: self.locked_state.clone(),
- max_state_bytes: self.max_state_bytes,
- }
- }
- }
- /// The storage monitor interface and controls compaction.
- pub struct InMemoryStorageMonitor {
- stored: Arc<InMemoryState>,
- max_state_bytes: usize,
- }
- impl RaftStorageMonitorTrait for InMemoryStorageMonitor {
- fn should_compact_log_now(&self) -> bool {
- let stored = self.stored.0.lock();
- let total_size = stored.total_size();
- return total_size > self.max_state_bytes;
- }
- }
- /// The persister interface that implements the logic.
- impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
- for InMemoryState
- {
- fn save_term_vote(&self, term: Term, voted_for: String) {
- let mut stored = self.0.lock();
- stored.current_term = term;
- stored.voted_for = voted_for;
- }
- fn append_one_entry(&self, entry: &LogEntry) {
- let mut stored = self.0.lock();
- stored.append_entry(RaftStoredLogEntry {
- index: entry.index(),
- term: entry.term(),
- command: entry.command_bytes(),
- });
- }
- fn append_entries<'a, LogEntryList>(&self, entries: LogEntryList)
- where
- LogEntry: 'a,
- LogEntryList: IntoIterator<Item = &'a LogEntry>,
- {
- let mut stored = self.0.lock();
- for entry in entries {
- stored.append_entry(RaftStoredLogEntry {
- index: entry.index(),
- term: entry.term(),
- command: entry.command_bytes(),
- })
- }
- }
- fn update_snapshot(&self, index: Index, term: Term, snapshot: &[u8]) {
- let mut stored = self.0.lock();
- stored.snapshot_index = index;
- stored.snapshot_term = term;
- stored.snapshot = snapshot.to_vec();
- while stored
- .log
- .front()
- .map(|e| e.index <= index)
- .unwrap_or(false)
- {
- let entry =
- stored.log.pop_front().expect("Popping must be successful");
- stored.log_size -= size_of::<RaftStoredLogEntry>();
- stored.log_size -= entry.command.len();
- }
- }
- }
- impl InMemoryStorage {
- /// Create a new storage with bytes limit.
- pub fn create(max_state_bytes: usize) -> Self {
- Self {
- locked_state: Arc::new(InMemoryState(Mutex::new(State::create()))),
- max_state_bytes,
- }
- }
- /// Save the entire in-memory state.
- pub fn save(&self) -> State {
- self.locked_state.0.lock().clone()
- }
- /// Restore the entire in-memory state, not including `max_state_bytes`.
- pub fn restore(&self, state: State) {
- *self.locked_state.0.lock() = state;
- }
- /// Returns the total bytes cost, not including snapshot.
- pub fn state_size(&self) -> usize {
- self.locked_state.0.lock().total_size()
- }
- /// Returns the bytes cost of the snapshot.
- pub fn snapshot_size(&self) -> usize {
- self.locked_state.0.lock().snapshot.len()
- }
- }
- #[cfg(test)]
- mod tests {
- use std::collections::VecDeque;
- use std::mem::size_of;
- use std::ops::Deref;
- use parking_lot::Mutex;
- use ruaft::storage::{
- RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
- RaftStorageTrait,
- };
- use ruaft::{Index, Term};
- use crate::in_memory_storage::{InMemoryState, State};
- use crate::InMemoryStorage;
- struct Transaction {
- index: Index,
- amount: f64,
- description: String,
- }
- impl Transaction {
- fn populate(index: Index) -> Self {
- Self {
- index,
- amount: index as f64 * 7.0,
- description: char::from('a' as u8 + index as u8).to_string(),
- }
- }
- }
- impl RaftLogEntryRef for Transaction {
- fn index(&self) -> Index {
- self.index
- }
- fn term(&self) -> Term {
- Term(self.index / 2)
- }
- fn command_bytes(&self) -> Vec<u8> {
- let mut bytes = vec![];
- bytes.extend(self.index.to_be_bytes());
- bytes.extend(self.amount.to_be_bytes());
- bytes.extend(self.description.bytes());
- bytes
- }
- }
- fn type_hint(
- val: &InMemoryState,
- ) -> &impl RaftStoragePersisterTrait<Transaction> {
- val
- }
- #[test]
- fn test_append() {
- let state = InMemoryState(Mutex::new(State::create()));
- state.append_one_entry(&Transaction {
- index: 0,
- amount: 0.0,
- description: "a".to_owned(),
- });
- state.append_entries(&[
- Transaction {
- index: 1,
- amount: 1.0,
- description: "test".to_owned(),
- },
- Transaction {
- index: 2,
- amount: -1.0,
- description: "another".to_owned(),
- },
- Transaction {
- index: 3,
- amount: 1.0,
- description: "test".to_owned(),
- },
- ]);
- state.append_one_entry(&Transaction {
- index: 1,
- amount: 2.0,
- description: "".to_owned(),
- });
- let state = state.0.lock();
- assert_eq!(0, state.current_term.0);
- assert!(state.voted_for.is_empty());
- assert_eq!(0, state.snapshot_index);
- assert!(state.snapshot.is_empty());
- assert_eq!(296, state.log_size);
- // log
- assert_eq!(5, state.log.len());
- // log[0]
- assert_eq!(0, state.log[0].index);
- assert_eq!(Term(0), state.log[0].term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // index
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x61, // "a"
- ],
- state.log[0].command
- );
- // log[1]
- let entry = &state.log[1];
- assert_eq!(1, entry.index);
- assert_eq!(Term(0), entry.term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
- 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x74, 0x65, 0x73, 0x74, // "test"
- ],
- entry.command
- );
- // log[2]
- let entry = &state.log[2];
- assert_eq!(2, entry.index);
- assert_eq!(Term(1), entry.term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
- 0xBF, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x61, 0x6E, 0x6F, 0x74, 0x68, 0x65, 0x72 // "another"
- ],
- entry.command
- );
- // log[3]
- let entry = &state.log[3];
- assert_eq!(3, entry.index);
- assert_eq!(Term(1), entry.term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
- 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x74, 0x65, 0x73, 0x74, // "test"
- ],
- entry.command
- );
- // log[4]
- let entry = &state.log[4];
- assert_eq!(1, entry.index);
- assert_eq!(Term(0), entry.term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
- 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- ],
- entry.command
- );
- }
- #[test]
- fn test_save_term_vote() {
- let state = InMemoryState(Mutex::new(State::create()));
- {
- let state = state.0.lock();
- assert_eq!(Term(0), state.current_term);
- assert!(state.voted_for.is_empty());
- }
- type_hint(&state).save_term_vote(Term(9), "hi".to_owned());
- let state = state.0.lock();
- assert_eq!(Term(9), state.current_term);
- assert_eq!("hi", &state.voted_for);
- }
- #[test]
- fn test_update_snapshot() {
- let state = InMemoryState(Mutex::new(State::create()));
- {
- let state = state.0.lock();
- assert_eq!(0, state.snapshot_index);
- assert!(state.snapshot.is_empty());
- }
- state.append_entries(&[
- Transaction::populate(0),
- Transaction::populate(1),
- Transaction::populate(7),
- Transaction::populate(8),
- ]);
- type_hint(&state).update_snapshot(7, Term(3), &[0x01, 0x02]);
- let state = state.0.lock();
- assert_eq!(7, state.snapshot_index);
- assert_eq!(Term(3), state.snapshot_term);
- assert_eq!(&[0x01, 0x02], state.snapshot.as_slice());
- // The first 3 entries are removed eagerly.
- assert_eq!(1, state.log.len());
- }
- #[test]
- fn test_read_state() {
- let storage = InMemoryStorage::create(0);
- let state = storage.clone().persister::<Transaction>();
- state.append_entries(&[
- Transaction::populate(0),
- Transaction::populate(1),
- Transaction::populate(2),
- Transaction::populate(3),
- Transaction {
- index: 2,
- amount: 1.0,
- description: "hi".to_owned(),
- },
- Transaction::populate(4),
- Transaction::populate(5),
- Transaction::populate(5),
- Transaction::populate(5),
- Transaction::populate(6),
- Transaction {
- index: 3,
- amount: 1.0,
- description: "hi".to_owned(),
- },
- Transaction::populate(7),
- Transaction::populate(7),
- Transaction::populate(7),
- ]);
- type_hint(&state).save_term_vote(Term(7), "voted_for".to_owned());
- type_hint(&state).update_snapshot(1, Term(0), &[0x99]);
- let raft_stored_state = storage
- .read_state()
- .expect("Read in-memory state should never fail");
- assert_eq!(Term(7), raft_stored_state.current_term);
- assert_eq!("voted_for", &raft_stored_state.voted_for);
- assert_eq!(3, raft_stored_state.log.len());
- assert_eq!(&[0x99], raft_stored_state.snapshot.as_slice());
- assert_eq!(1, raft_stored_state.snapshot_index);
- assert_eq!(Term(0), raft_stored_state.snapshot_term);
- let entry = &raft_stored_state.log[0];
- assert_eq!(2, entry.index);
- assert_eq!(Term(1), entry.term);
- assert_eq!(
- &[
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
- 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x68, 0x69, // "hi"
- ],
- entry.command.as_slice()
- );
- let entry = &raft_stored_state.log[1];
- assert_eq!(3, entry.index);
- assert_eq!(Term(1), entry.term);
- assert_eq!(
- &[
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
- 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x68, 0x69, // "hi"
- ],
- entry.command.as_slice()
- );
- let entry = &raft_stored_state.log[2];
- assert_eq!(7, entry.index);
- assert_eq!(Term(3), entry.term);
- assert_eq!(
- &[
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // index
- 0x40, 0x48, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x68, // "h"
- ],
- entry.command.as_slice()
- );
- assert_eq!(807, state.0.lock().total_size());
- }
- #[test]
- fn test_save_restore() {
- let storage = InMemoryStorage::create(0);
- let state = storage.clone().persister::<Transaction>();
- state.append_one_entry(&Transaction {
- index: 9,
- amount: 1.0,
- description: "hello".to_owned(),
- });
- let saved = storage.save();
- let another_storage = InMemoryStorage::create(100);
- another_storage.restore(saved);
- assert_eq!(100, another_storage.max_state_bytes);
- let another_state = another_storage.locked_state.0.lock();
- let entry = &another_state.log[0];
- assert_eq!(9, entry.index);
- assert_eq!(Term(4), entry.term);
- assert_eq!(
- vec![
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, // index
- 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
- 0x68, 0x65, 0x6C, 0x6C, 0x6F, // "hello"
- ],
- entry.command
- );
- }
- #[test]
- fn test_total_size() {
- let state = State::create();
- assert_eq!(8, size_of::<Term>());
- assert_eq!(8, size_of::<Index>());
- assert_eq!(8, size_of::<usize>());
- assert_eq!(24, size_of::<String>());
- assert_eq!(24, size_of::<Vec<u8>>());
- assert_eq!(32, size_of::<VecDeque<u8>>());
- // 112 = 8 + 24 + 32 + 8 + 8 + 24 + 8
- let empty_size = 112;
- assert_eq!(empty_size, state.total_size());
- let state = InMemoryState(Mutex::new(State::create()));
- // command_size = 8 + 8 + 5 = 21
- // log_size = 8 + 8 + 24 (vec) + command_size = 61
- state.append_one_entry(&Transaction {
- index: 9,
- amount: 1.0,
- description: "hello".to_owned(),
- });
- assert_eq!(61, state.0.lock().log_size);
- assert_eq!(empty_size + 61, state.0.lock().total_size());
- // total_size() is verified in other tests with complex setup.
- }
- #[test]
- fn test_monitor() {
- let storage = InMemoryStorage::create(150);
- let state = storage.clone().persister::<Transaction>();
- let monitor = storage.monitor();
- assert_eq!(150, monitor.max_state_bytes);
- assert!(!monitor.should_compact_log_now());
- state.append_one_entry(&Transaction {
- index: 9,
- amount: 1.0,
- description: "hello".to_owned(),
- });
- assert_eq!(173, storage.state_size());
- assert!(monitor.should_compact_log_now());
- let bigger_storage = InMemoryStorage::create(180);
- bigger_storage.restore(storage.save());
- assert_eq!(173, bigger_storage.state_size());
- let bigger_monitor = bigger_storage.monitor();
- assert!(!bigger_monitor.should_compact_log_now());
- }
- #[test]
- fn test_snapshot_size() {
- let storage = InMemoryStorage::create(0);
- let state = storage.clone().persister::<Transaction>();
- {
- let state = state.0.lock();
- assert_eq!(0, state.snapshot_index);
- assert!(state.snapshot.is_empty());
- }
- type_hint(state.deref()).update_snapshot(7, Term(3), &[0x01, 0x02]);
- assert_eq!(2, storage.snapshot_size());
- }
- }
|