in_memory_storage.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. /// A in-memory simulation of storage operations.
  2. use std::collections::VecDeque;
  3. use std::mem::size_of;
  4. use std::sync::Arc;
  5. use parking_lot::Mutex;
  6. use ruaft::storage::{
  7. RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
  8. RaftStorageTrait, RaftStoredLogEntry, RaftStoredState,
  9. };
  10. use ruaft::{Index, Term};
  11. #[derive(Clone)]
  12. pub struct State {
  13. current_term: Term,
  14. voted_for: String,
  15. log: VecDeque<RaftStoredLogEntry>,
  16. snapshot_index: Index,
  17. snapshot_term: Term,
  18. snapshot: Vec<u8>,
  19. log_size: usize,
  20. }
  21. impl State {
  22. /// Create an empty saved instance.
  23. fn create() -> Self {
  24. Self {
  25. current_term: Term(0),
  26. voted_for: "".to_owned(),
  27. log: VecDeque::new(),
  28. snapshot_index: 0,
  29. snapshot_term: Term(0),
  30. snapshot: vec![],
  31. log_size: 0,
  32. }
  33. }
  34. /// Append entry and update internal disk usage accounting.
  35. fn append_entry(&mut self, entry: RaftStoredLogEntry) {
  36. self.log_size += size_of::<RaftStoredLogEntry>();
  37. self.log_size += entry.command.len();
  38. self.log.push_back(entry);
  39. }
  40. /// Returns the total disk usage of stored data. Each scala type must be
  41. /// accounted here individually.
  42. fn total_size(&self) -> usize {
  43. self.log_size + self.voted_for.len() + size_of::<Self>()
  44. }
  45. }
  46. /// The shared data that should be put on disk.
  47. pub struct InMemoryState(Mutex<State>);
  48. /// The storage interface.
  49. #[derive(Clone)]
  50. pub struct InMemoryStorage {
  51. locked_state: Arc<InMemoryState>,
  52. max_state_bytes: usize,
  53. }
  54. impl RaftStorageTrait for InMemoryStorage {
  55. type RaftStoragePersister<LogEntry: RaftLogEntryRef> = InMemoryState;
  56. type RaftStorageMonitor = InMemoryStorageMonitor;
  57. fn persister<LogEntry>(self) -> Arc<Self::RaftStoragePersister<LogEntry>>
  58. where
  59. LogEntry: RaftLogEntryRef,
  60. {
  61. self.locked_state
  62. }
  63. fn read_state(&self) -> std::io::Result<RaftStoredState> {
  64. let stored = self.locked_state.0.lock();
  65. let snapshot_index = stored.snapshot_index;
  66. let mut organized_log = vec![];
  67. for op in &stored.log {
  68. if op.index <= snapshot_index {
  69. // Discard all entries that are before snapshot index.
  70. continue;
  71. }
  72. while organized_log
  73. .last()
  74. .map(|entry: &RaftStoredLogEntry| entry.index >= op.index)
  75. .unwrap_or(false)
  76. {
  77. organized_log.pop();
  78. }
  79. organized_log.push(RaftStoredLogEntry {
  80. index: op.index,
  81. term: op.term,
  82. command: op.command.clone(),
  83. });
  84. }
  85. Ok(RaftStoredState {
  86. current_term: stored.current_term,
  87. voted_for: stored.voted_for.clone(),
  88. log: organized_log,
  89. snapshot_index: stored.snapshot_index,
  90. snapshot_term: stored.snapshot_term,
  91. snapshot: stored.snapshot.clone(),
  92. })
  93. }
  94. fn monitor(&self) -> Self::RaftStorageMonitor {
  95. InMemoryStorageMonitor {
  96. stored: self.locked_state.clone(),
  97. max_state_bytes: self.max_state_bytes,
  98. }
  99. }
  100. }
  101. /// The storage monitor interface and controls compaction.
  102. pub struct InMemoryStorageMonitor {
  103. stored: Arc<InMemoryState>,
  104. max_state_bytes: usize,
  105. }
  106. impl RaftStorageMonitorTrait for InMemoryStorageMonitor {
  107. fn should_compact_log_now(&self) -> bool {
  108. let stored = self.stored.0.lock();
  109. let total_size = stored.total_size();
  110. return total_size > self.max_state_bytes;
  111. }
  112. }
  113. /// The persister interface that implements the logic.
  114. impl<LogEntry: RaftLogEntryRef> RaftStoragePersisterTrait<LogEntry>
  115. for InMemoryState
  116. {
  117. fn save_term_vote(&self, term: Term, voted_for: String) {
  118. let mut stored = self.0.lock();
  119. stored.current_term = term;
  120. stored.voted_for = voted_for;
  121. }
  122. fn append_one_entry(&self, entry: &LogEntry) {
  123. let mut stored = self.0.lock();
  124. stored.append_entry(RaftStoredLogEntry {
  125. index: entry.index(),
  126. term: entry.term(),
  127. command: entry.command_bytes(),
  128. });
  129. }
  130. fn append_entries<'a, LogEntryList>(&self, entries: LogEntryList)
  131. where
  132. LogEntry: 'a,
  133. LogEntryList: IntoIterator<Item = &'a LogEntry>,
  134. {
  135. let mut stored = self.0.lock();
  136. for entry in entries {
  137. stored.append_entry(RaftStoredLogEntry {
  138. index: entry.index(),
  139. term: entry.term(),
  140. command: entry.command_bytes(),
  141. })
  142. }
  143. }
  144. fn update_snapshot(&self, index: Index, term: Term, snapshot: &[u8]) {
  145. let mut stored = self.0.lock();
  146. stored.snapshot_index = index;
  147. stored.snapshot_term = term;
  148. stored.snapshot = snapshot.to_vec();
  149. while stored
  150. .log
  151. .front()
  152. .map(|e| e.index <= index)
  153. .unwrap_or(false)
  154. {
  155. let entry =
  156. stored.log.pop_front().expect("Popping must be successful");
  157. stored.log_size -= size_of::<RaftStoredLogEntry>();
  158. stored.log_size -= entry.command.len();
  159. }
  160. }
  161. }
  162. impl InMemoryStorage {
  163. /// Create a new storage with bytes limit.
  164. pub fn create(max_state_bytes: usize) -> Self {
  165. Self {
  166. locked_state: Arc::new(InMemoryState(Mutex::new(State::create()))),
  167. max_state_bytes,
  168. }
  169. }
  170. /// Save the entire in-memory state.
  171. pub fn save(&self) -> State {
  172. self.locked_state.0.lock().clone()
  173. }
  174. /// Restore the entire in-memory state, not including `max_state_bytes`.
  175. pub fn restore(&self, state: State) {
  176. *self.locked_state.0.lock() = state;
  177. }
  178. /// Returns the total bytes cost, not including snapshot.
  179. pub fn state_size(&self) -> usize {
  180. self.locked_state.0.lock().total_size()
  181. }
  182. /// Returns the bytes cost of the snapshot.
  183. pub fn snapshot_size(&self) -> usize {
  184. self.locked_state.0.lock().snapshot.len()
  185. }
  186. }
  187. #[cfg(test)]
  188. mod tests {
  189. use std::collections::VecDeque;
  190. use std::mem::size_of;
  191. use std::ops::Deref;
  192. use parking_lot::Mutex;
  193. use ruaft::storage::{
  194. RaftLogEntryRef, RaftStorageMonitorTrait, RaftStoragePersisterTrait,
  195. RaftStorageTrait,
  196. };
  197. use ruaft::{Index, Term};
  198. use crate::in_memory_storage::{InMemoryState, State};
  199. use crate::InMemoryStorage;
  200. struct Transaction {
  201. index: Index,
  202. amount: f64,
  203. description: String,
  204. }
  205. impl Transaction {
  206. fn populate(index: Index) -> Self {
  207. Self {
  208. index,
  209. amount: index as f64 * 7.0,
  210. description: char::from('a' as u8 + index as u8).to_string(),
  211. }
  212. }
  213. }
  214. impl RaftLogEntryRef for Transaction {
  215. fn index(&self) -> Index {
  216. self.index
  217. }
  218. fn term(&self) -> Term {
  219. Term(self.index / 2)
  220. }
  221. fn command_bytes(&self) -> Vec<u8> {
  222. let mut bytes = vec![];
  223. bytes.extend(self.index.to_be_bytes());
  224. bytes.extend(self.amount.to_be_bytes());
  225. bytes.extend(self.description.bytes());
  226. bytes
  227. }
  228. }
  229. fn type_hint(
  230. val: &InMemoryState,
  231. ) -> &impl RaftStoragePersisterTrait<Transaction> {
  232. val
  233. }
  234. #[test]
  235. fn test_append() {
  236. let state = InMemoryState(Mutex::new(State::create()));
  237. state.append_one_entry(&Transaction {
  238. index: 0,
  239. amount: 0.0,
  240. description: "a".to_owned(),
  241. });
  242. state.append_entries(&[
  243. Transaction {
  244. index: 1,
  245. amount: 1.0,
  246. description: "test".to_owned(),
  247. },
  248. Transaction {
  249. index: 2,
  250. amount: -1.0,
  251. description: "another".to_owned(),
  252. },
  253. Transaction {
  254. index: 3,
  255. amount: 1.0,
  256. description: "test".to_owned(),
  257. },
  258. ]);
  259. state.append_one_entry(&Transaction {
  260. index: 1,
  261. amount: 2.0,
  262. description: "".to_owned(),
  263. });
  264. let state = state.0.lock();
  265. assert_eq!(0, state.current_term.0);
  266. assert!(state.voted_for.is_empty());
  267. assert_eq!(0, state.snapshot_index);
  268. assert!(state.snapshot.is_empty());
  269. assert_eq!(296, state.log_size);
  270. // log
  271. assert_eq!(5, state.log.len());
  272. // log[0]
  273. assert_eq!(0, state.log[0].index);
  274. assert_eq!(Term(0), state.log[0].term);
  275. assert_eq!(
  276. vec![
  277. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // index
  278. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  279. 0x61, // "a"
  280. ],
  281. state.log[0].command
  282. );
  283. // log[1]
  284. let entry = &state.log[1];
  285. assert_eq!(1, entry.index);
  286. assert_eq!(Term(0), entry.term);
  287. assert_eq!(
  288. vec![
  289. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
  290. 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  291. 0x74, 0x65, 0x73, 0x74, // "test"
  292. ],
  293. entry.command
  294. );
  295. // log[2]
  296. let entry = &state.log[2];
  297. assert_eq!(2, entry.index);
  298. assert_eq!(Term(1), entry.term);
  299. assert_eq!(
  300. vec![
  301. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
  302. 0xBF, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  303. 0x61, 0x6E, 0x6F, 0x74, 0x68, 0x65, 0x72 // "another"
  304. ],
  305. entry.command
  306. );
  307. // log[3]
  308. let entry = &state.log[3];
  309. assert_eq!(3, entry.index);
  310. assert_eq!(Term(1), entry.term);
  311. assert_eq!(
  312. vec![
  313. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
  314. 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  315. 0x74, 0x65, 0x73, 0x74, // "test"
  316. ],
  317. entry.command
  318. );
  319. // log[4]
  320. let entry = &state.log[4];
  321. assert_eq!(1, entry.index);
  322. assert_eq!(Term(0), entry.term);
  323. assert_eq!(
  324. vec![
  325. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // index
  326. 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  327. ],
  328. entry.command
  329. );
  330. }
  331. #[test]
  332. fn test_save_term_vote() {
  333. let state = InMemoryState(Mutex::new(State::create()));
  334. {
  335. let state = state.0.lock();
  336. assert_eq!(Term(0), state.current_term);
  337. assert!(state.voted_for.is_empty());
  338. }
  339. type_hint(&state).save_term_vote(Term(9), "hi".to_owned());
  340. let state = state.0.lock();
  341. assert_eq!(Term(9), state.current_term);
  342. assert_eq!("hi", &state.voted_for);
  343. }
  344. #[test]
  345. fn test_update_snapshot() {
  346. let state = InMemoryState(Mutex::new(State::create()));
  347. {
  348. let state = state.0.lock();
  349. assert_eq!(0, state.snapshot_index);
  350. assert!(state.snapshot.is_empty());
  351. }
  352. state.append_entries(&[
  353. Transaction::populate(0),
  354. Transaction::populate(1),
  355. Transaction::populate(7),
  356. Transaction::populate(8),
  357. ]);
  358. type_hint(&state).update_snapshot(7, Term(3), &[0x01, 0x02]);
  359. let state = state.0.lock();
  360. assert_eq!(7, state.snapshot_index);
  361. assert_eq!(Term(3), state.snapshot_term);
  362. assert_eq!(&[0x01, 0x02], state.snapshot.as_slice());
  363. // The first 3 entries are removed eagerly.
  364. assert_eq!(1, state.log.len());
  365. }
  366. #[test]
  367. fn test_read_state() {
  368. let storage = InMemoryStorage::create(0);
  369. let state = storage.clone().persister::<Transaction>();
  370. state.append_entries(&[
  371. Transaction::populate(0),
  372. Transaction::populate(1),
  373. Transaction::populate(2),
  374. Transaction::populate(3),
  375. Transaction {
  376. index: 2,
  377. amount: 1.0,
  378. description: "hi".to_owned(),
  379. },
  380. Transaction::populate(4),
  381. Transaction::populate(5),
  382. Transaction::populate(5),
  383. Transaction::populate(5),
  384. Transaction::populate(6),
  385. Transaction {
  386. index: 3,
  387. amount: 1.0,
  388. description: "hi".to_owned(),
  389. },
  390. Transaction::populate(7),
  391. Transaction::populate(7),
  392. Transaction::populate(7),
  393. ]);
  394. type_hint(&state).save_term_vote(Term(7), "voted_for".to_owned());
  395. type_hint(&state).update_snapshot(1, Term(0), &[0x99]);
  396. let raft_stored_state = storage
  397. .read_state()
  398. .expect("Read in-memory state should never fail");
  399. assert_eq!(Term(7), raft_stored_state.current_term);
  400. assert_eq!("voted_for", &raft_stored_state.voted_for);
  401. assert_eq!(3, raft_stored_state.log.len());
  402. assert_eq!(&[0x99], raft_stored_state.snapshot.as_slice());
  403. assert_eq!(1, raft_stored_state.snapshot_index);
  404. assert_eq!(Term(0), raft_stored_state.snapshot_term);
  405. let entry = &raft_stored_state.log[0];
  406. assert_eq!(2, entry.index);
  407. assert_eq!(Term(1), entry.term);
  408. assert_eq!(
  409. &[
  410. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // index
  411. 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  412. 0x68, 0x69, // "hi"
  413. ],
  414. entry.command.as_slice()
  415. );
  416. let entry = &raft_stored_state.log[1];
  417. assert_eq!(3, entry.index);
  418. assert_eq!(Term(1), entry.term);
  419. assert_eq!(
  420. &[
  421. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // index
  422. 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  423. 0x68, 0x69, // "hi"
  424. ],
  425. entry.command.as_slice()
  426. );
  427. let entry = &raft_stored_state.log[2];
  428. assert_eq!(7, entry.index);
  429. assert_eq!(Term(3), entry.term);
  430. assert_eq!(
  431. &[
  432. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // index
  433. 0x40, 0x48, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  434. 0x68, // "h"
  435. ],
  436. entry.command.as_slice()
  437. );
  438. assert_eq!(807, state.0.lock().total_size());
  439. }
  440. #[test]
  441. fn test_save_restore() {
  442. let storage = InMemoryStorage::create(0);
  443. let state = storage.clone().persister::<Transaction>();
  444. state.append_one_entry(&Transaction {
  445. index: 9,
  446. amount: 1.0,
  447. description: "hello".to_owned(),
  448. });
  449. let saved = storage.save();
  450. let another_storage = InMemoryStorage::create(100);
  451. another_storage.restore(saved);
  452. assert_eq!(100, another_storage.max_state_bytes);
  453. let another_state = another_storage.locked_state.0.lock();
  454. let entry = &another_state.log[0];
  455. assert_eq!(9, entry.index);
  456. assert_eq!(Term(4), entry.term);
  457. assert_eq!(
  458. vec![
  459. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, // index
  460. 0x3F, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // amount
  461. 0x68, 0x65, 0x6C, 0x6C, 0x6F, // "hello"
  462. ],
  463. entry.command
  464. );
  465. }
  466. #[test]
  467. fn test_total_size() {
  468. let state = State::create();
  469. assert_eq!(8, size_of::<Term>());
  470. assert_eq!(8, size_of::<Index>());
  471. assert_eq!(8, size_of::<usize>());
  472. assert_eq!(24, size_of::<String>());
  473. assert_eq!(24, size_of::<Vec<u8>>());
  474. assert_eq!(32, size_of::<VecDeque<u8>>());
  475. // 112 = 8 + 24 + 32 + 8 + 8 + 24 + 8
  476. let empty_size = 112;
  477. assert_eq!(empty_size, state.total_size());
  478. let state = InMemoryState(Mutex::new(State::create()));
  479. // command_size = 8 + 8 + 5 = 21
  480. // log_size = 8 + 8 + 24 (vec) + command_size = 61
  481. state.append_one_entry(&Transaction {
  482. index: 9,
  483. amount: 1.0,
  484. description: "hello".to_owned(),
  485. });
  486. assert_eq!(61, state.0.lock().log_size);
  487. assert_eq!(empty_size + 61, state.0.lock().total_size());
  488. // total_size() is verified in other tests with complex setup.
  489. }
  490. #[test]
  491. fn test_monitor() {
  492. let storage = InMemoryStorage::create(150);
  493. let state = storage.clone().persister::<Transaction>();
  494. let monitor = storage.monitor();
  495. assert_eq!(150, monitor.max_state_bytes);
  496. assert!(!monitor.should_compact_log_now());
  497. state.append_one_entry(&Transaction {
  498. index: 9,
  499. amount: 1.0,
  500. description: "hello".to_owned(),
  501. });
  502. assert_eq!(173, storage.state_size());
  503. assert!(monitor.should_compact_log_now());
  504. let bigger_storage = InMemoryStorage::create(180);
  505. bigger_storage.restore(storage.save());
  506. assert_eq!(173, bigger_storage.state_size());
  507. let bigger_monitor = bigger_storage.monitor();
  508. assert!(!bigger_monitor.should_compact_log_now());
  509. }
  510. #[test]
  511. fn test_snapshot_size() {
  512. let storage = InMemoryStorage::create(0);
  513. let state = storage.clone().persister::<Transaction>();
  514. {
  515. let state = state.0.lock();
  516. assert_eq!(0, state.snapshot_index);
  517. assert!(state.snapshot.is_empty());
  518. }
  519. type_hint(state.deref()).update_snapshot(7, Term(3), &[0x01, 0x02]);
  520. assert_eq!(2, storage.snapshot_size());
  521. }
  522. }