|
@@ -1,4 +1,5 @@
|
|
|
/// A in-memory simulation of storage operations.
|
|
/// A in-memory simulation of storage operations.
|
|
|
|
|
+use std::collections::VecDeque;
|
|
|
use std::mem::size_of;
|
|
use std::mem::size_of;
|
|
|
use std::sync::Arc;
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
@@ -14,7 +15,7 @@ use ruaft::{Index, Term};
|
|
|
pub struct State {
|
|
pub struct State {
|
|
|
current_term: Term,
|
|
current_term: Term,
|
|
|
voted_for: String,
|
|
voted_for: String,
|
|
|
- log: Vec<RaftStoredLogEntry>,
|
|
|
|
|
|
|
+ log: VecDeque<RaftStoredLogEntry>,
|
|
|
|
|
|
|
|
snapshot_index: Index,
|
|
snapshot_index: Index,
|
|
|
snapshot_term: Term,
|
|
snapshot_term: Term,
|
|
@@ -29,7 +30,7 @@ impl State {
|
|
|
Self {
|
|
Self {
|
|
|
current_term: Term(0),
|
|
current_term: Term(0),
|
|
|
voted_for: "".to_owned(),
|
|
voted_for: "".to_owned(),
|
|
|
- log: vec![],
|
|
|
|
|
|
|
+ log: VecDeque::new(),
|
|
|
snapshot_index: 0,
|
|
snapshot_index: 0,
|
|
|
snapshot_term: Term(0),
|
|
snapshot_term: Term(0),
|
|
|
snapshot: vec![],
|
|
snapshot: vec![],
|
|
@@ -42,7 +43,7 @@ impl State {
|
|
|
self.log_size += size_of::<RaftStoredLogEntry>();
|
|
self.log_size += size_of::<RaftStoredLogEntry>();
|
|
|
self.log_size += entry.command.len();
|
|
self.log_size += entry.command.len();
|
|
|
|
|
|
|
|
- self.log.push(entry);
|
|
|
|
|
|
|
+ self.log.push_back(entry);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Returns the total disk usage of stored data. Each scala type must be
|
|
/// Returns the total disk usage of stored data. Each scala type must be
|
|
@@ -169,6 +170,17 @@ impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
|
|
|
stored.snapshot_index = index;
|
|
stored.snapshot_index = index;
|
|
|
stored.snapshot_term = term;
|
|
stored.snapshot_term = term;
|
|
|
stored.snapshot = snapshot.to_vec();
|
|
stored.snapshot = snapshot.to_vec();
|
|
|
|
|
+ while stored
|
|
|
|
|
+ .log
|
|
|
|
|
+ .front()
|
|
|
|
|
+ .map(|e| e.index <= index)
|
|
|
|
|
+ .unwrap_or(false)
|
|
|
|
|
+ {
|
|
|
|
|
+ let entry =
|
|
|
|
|
+ stored.log.pop_front().expect("Popping must be successful");
|
|
|
|
|
+ stored.log_size -= size_of::<RaftStoredLogEntry>();
|
|
|
|
|
+ stored.log_size -= entry.command.len();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -204,6 +216,7 @@ impl InMemoryStorage {
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod tests {
|
|
mod tests {
|
|
|
|
|
+ use std::collections::VecDeque;
|
|
|
use std::mem::size_of;
|
|
use std::mem::size_of;
|
|
|
use std::ops::Deref;
|
|
use std::ops::Deref;
|
|
|
|
|
|
|
@@ -391,12 +404,20 @@ mod tests {
|
|
|
assert!(state.snapshot.is_empty());
|
|
assert!(state.snapshot.is_empty());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ state.append_entries(&[
|
|
|
|
|
+ Transaction::populate(0),
|
|
|
|
|
+ Transaction::populate(1),
|
|
|
|
|
+ Transaction::populate(7),
|
|
|
|
|
+ Transaction::populate(8),
|
|
|
|
|
+ ]);
|
|
|
type_hint(&state).update_snapshot(7, Term(3), &[0x01, 0x02]);
|
|
type_hint(&state).update_snapshot(7, Term(3), &[0x01, 0x02]);
|
|
|
|
|
|
|
|
let state = state.0.lock();
|
|
let state = state.0.lock();
|
|
|
assert_eq!(7, state.snapshot_index);
|
|
assert_eq!(7, state.snapshot_index);
|
|
|
assert_eq!(Term(3), state.snapshot_term);
|
|
assert_eq!(Term(3), state.snapshot_term);
|
|
|
assert_eq!(&[0x01, 0x02], state.snapshot.as_slice());
|
|
assert_eq!(&[0x01, 0x02], state.snapshot.as_slice());
|
|
|
|
|
+ // The first 3 entries are removed eagerly.
|
|
|
|
|
+ assert_eq!(1, state.log.len());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
@@ -476,7 +497,7 @@ mod tests {
|
|
|
entry.command.as_slice()
|
|
entry.command.as_slice()
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
- assert_eq!(905, state.0.lock().total_size());
|
|
|
|
|
|
|
+ assert_eq!(807, state.0.lock().total_size());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
@@ -517,9 +538,11 @@ mod tests {
|
|
|
assert_eq!(8, size_of::<usize>());
|
|
assert_eq!(8, size_of::<usize>());
|
|
|
assert_eq!(24, size_of::<String>());
|
|
assert_eq!(24, size_of::<String>());
|
|
|
assert_eq!(24, size_of::<Vec<u8>>());
|
|
assert_eq!(24, size_of::<Vec<u8>>());
|
|
|
|
|
+ assert_eq!(32, size_of::<VecDeque<u8>>());
|
|
|
|
|
|
|
|
- // 104 = 8 + 24 + 24 + 8 + 24 + 8
|
|
|
|
|
- assert_eq!(96, state.total_size());
|
|
|
|
|
|
|
+ // 112 = 8 + 24 + 32 + 8 + 8 + 24 + 8
|
|
|
|
|
+ let empty_size = 112;
|
|
|
|
|
+ assert_eq!(empty_size, state.total_size());
|
|
|
|
|
|
|
|
let state = InMemoryState(Mutex::new(State::create()));
|
|
let state = InMemoryState(Mutex::new(State::create()));
|
|
|
// command_size = 8 + 8 + 5 = 21
|
|
// command_size = 8 + 8 + 5 = 21
|
|
@@ -530,7 +553,7 @@ mod tests {
|
|
|
description: "hello".to_owned(),
|
|
description: "hello".to_owned(),
|
|
|
});
|
|
});
|
|
|
assert_eq!(61, state.0.lock().log_size);
|
|
assert_eq!(61, state.0.lock().log_size);
|
|
|
- assert_eq!(96 + 61, state.0.lock().total_size());
|
|
|
|
|
|
|
+ assert_eq!(empty_size + 61, state.0.lock().total_size());
|
|
|
|
|
|
|
|
// total_size() is verified in other tests with complex setup.
|
|
// total_size() is verified in other tests with complex setup.
|
|
|
}
|
|
}
|
|
@@ -548,12 +571,12 @@ mod tests {
|
|
|
amount: 1.0,
|
|
amount: 1.0,
|
|
|
description: "hello".to_owned(),
|
|
description: "hello".to_owned(),
|
|
|
});
|
|
});
|
|
|
- assert_eq!(157, storage.state_size());
|
|
|
|
|
|
|
+ assert_eq!(173, storage.state_size());
|
|
|
assert!(monitor.should_compact_log_now());
|
|
assert!(monitor.should_compact_log_now());
|
|
|
|
|
|
|
|
- let bigger_storage = InMemoryStorage::create(160);
|
|
|
|
|
|
|
+ let bigger_storage = InMemoryStorage::create(180);
|
|
|
bigger_storage.restore(storage.save());
|
|
bigger_storage.restore(storage.save());
|
|
|
- assert_eq!(157, bigger_storage.state_size());
|
|
|
|
|
|
|
+ assert_eq!(173, bigger_storage.state_size());
|
|
|
let bigger_monitor = bigger_storage.monitor();
|
|
let bigger_monitor = bigger_storage.monitor();
|
|
|
assert!(!bigger_monitor.should_compact_log_now());
|
|
assert!(!bigger_monitor.should_compact_log_now());
|
|
|
}
|
|
}
|