internal.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. use std::sync::Arc;
  2. use serde::de::DeserializeOwned;
  3. use serde::Serialize;
  4. use crate::log_array::{LogArray, LogEntry};
  5. use crate::raft_state::RaftState;
  6. use crate::storage::decode_and_encode::{
  7. decode_log_entry, decode_voted_for, encode_log_entry, encode_voted_for,
  8. };
  9. use crate::storage::{
  10. RaftLogEntryRef, RaftStoragePersisterTrait, RaftStoredState,
  11. };
  12. use crate::{Index, IndexTerm, Peer, Term};
  13. /// The internal interface of log entry persister. It is similar to
  14. /// `RaftStoragePersisterTrait`, but with concrete and private types. It
  15. /// provides better ergonomics to developers of this project.
  16. ///
  17. /// This trait cannot contain generic methods because it is made into a trait
  18. /// object in type `SharedLogPersister`. Trait objects are used to remove one
  19. /// more generic parameter on the overall `Raft` type.
  20. pub(crate) trait LogPersisterTrait<Command>: Send + Sync {
  21. /// Save term and vote from the RaftState.
  22. fn save_term_vote(&self, rf: &RaftState<Command>);
  23. /// Save one log entry. Blocking until the data is persisted.
  24. fn append_one_entry(&self, entry: &LogEntry<Command>);
  25. /// Save may log entries. Blocking until the data is persisted.
  26. fn append_entries(&self, entries: &[LogEntry<Command>]);
  27. /// Save snapshot. Blocking until the data is persisted.
  28. fn update_snapshot(&self, index_term: IndexTerm, snapshot: &[u8]);
  29. }
  30. /// A thin wrapper around `RaftStoragePersisterTrait`.
  31. impl<T, Command> LogPersisterTrait<Command> for T
  32. where
  33. Command: Serialize,
  34. T: RaftStoragePersisterTrait<LogEntry<Command>>,
  35. {
  36. fn save_term_vote(&self, rf: &RaftState<Command>) {
  37. <T as RaftStoragePersisterTrait<LogEntry<Command>>>::save_term_vote(
  38. self,
  39. rf.current_term,
  40. encode_voted_for(&rf.voted_for),
  41. )
  42. }
  43. fn append_one_entry(&self, entry: &LogEntry<Command>) {
  44. <T as RaftStoragePersisterTrait<LogEntry<Command>>>::append_one_entry(
  45. self, entry,
  46. )
  47. }
  48. fn append_entries(&self, entries: &[LogEntry<Command>]) {
  49. <T as RaftStoragePersisterTrait<LogEntry<Command>>>::append_entries(
  50. self, entries,
  51. )
  52. }
  53. fn update_snapshot(&self, index_term: IndexTerm, snapshot: &[u8]) {
  54. <T as RaftStoragePersisterTrait<LogEntry<Command>>>::update_snapshot(
  55. self,
  56. index_term.index,
  57. index_term.term,
  58. snapshot,
  59. )
  60. }
  61. }
  62. /// The crate-internal interface that is exposed to other parts of this crate.
  63. pub(crate) type SharedLogPersister<Command> =
  64. Arc<dyn LogPersisterTrait<Command>>;
  65. /// Adapter from the internal `LogEntry` type to the public interface.
  66. impl<Command: Serialize> RaftLogEntryRef for LogEntry<Command> {
  67. fn index(&self) -> Index {
  68. self.index
  69. }
  70. fn term(&self) -> Term {
  71. self.term
  72. }
  73. fn command_bytes(&self) -> Vec<u8> {
  74. encode_log_entry(self)
  75. }
  76. }
  77. impl RaftStoredState {
  78. pub(crate) fn current_term(&self) -> Term {
  79. self.current_term
  80. }
  81. pub(crate) fn voted_for(&self) -> Option<Peer> {
  82. decode_voted_for(&self.voted_for)
  83. .expect("Persisted log should not contain error")
  84. }
  85. pub(crate) fn restore_log_array<Command: DeserializeOwned>(
  86. self,
  87. log_array: &mut LogArray<Command>,
  88. ) {
  89. log_array.reset(self.snapshot_index, self.snapshot_term, self.snapshot);
  90. for entry in self.log.iter() {
  91. log_array.push(decode_log_entry(&entry.command));
  92. }
  93. }
  94. }
  95. #[cfg(test)]
  96. mod tests {
  97. use crate::log_array::LogArray;
  98. use crate::raft::Peer;
  99. use crate::storage::decode_and_encode::{
  100. encode_log_entry, encode_voted_for,
  101. };
  102. use crate::storage::{RaftStoredLogEntry, RaftStoredState};
  103. use crate::{IndexTerm, Term};
  104. #[test]
  105. fn test_restore_log_array() {
  106. let mut log_array = LogArray::create();
  107. log_array.add_command(Term(1), 1i32);
  108. log_array.add_command(Term(3), 7i32);
  109. let stored = RaftStoredState {
  110. current_term: Term(8),
  111. voted_for: encode_voted_for(&Some(Peer(1))),
  112. log: vec![
  113. RaftStoredLogEntry {
  114. index: 1,
  115. term: Term(1),
  116. command: encode_log_entry(log_array.at(1)),
  117. },
  118. RaftStoredLogEntry {
  119. index: 2,
  120. term: Term(3),
  121. command: encode_log_entry(log_array.at(2)),
  122. },
  123. ],
  124. snapshot_index: 0,
  125. snapshot_term: Term(1),
  126. snapshot: vec![0x09, 0x90],
  127. };
  128. let mut new_log_array: LogArray<i32> = LogArray::create();
  129. stored.restore_log_array(&mut new_log_array);
  130. assert_eq!(log_array.start(), new_log_array.start());
  131. assert_eq!(log_array.end(), new_log_array.end());
  132. assert_eq!(log_array.at(1).command(), new_log_array.at(1).command());
  133. assert_eq!(log_array.at(2).command(), new_log_array.at(2).command());
  134. assert_eq!(IndexTerm::pack(0, Term(1)), new_log_array.snapshot().0);
  135. assert_eq!(&[0x09u8, 0x90u8], new_log_array.snapshot().1);
  136. }
  137. }