snapshot.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use crossbeam_utils::sync::{Parker, Unparker};
  4. use parking_lot::{Condvar, Mutex};
  5. use crate::check_or_record;
  6. use crate::daemon_env::{Daemon, ErrorKind};
  7. use crate::{Index, Raft};
  8. #[derive(Clone, Debug, Default)]
  9. pub struct Snapshot {
  10. pub last_included_index: Index,
  11. pub data: Vec<u8>,
  12. }
  13. #[derive(Clone, Debug, Default)]
  14. pub(crate) struct SnapshotDaemon {
  15. unparker: Option<Unparker>,
  16. current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
  17. }
  18. pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
  19. impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
  20. impl SnapshotDaemon {
  21. /// Saves the snapshot into the staging area of the daemon, before it is
  22. /// applied to the log.
  23. ///
  24. /// Does nothing if the snapshot has a lower index than any snapshot before.
  25. pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
  26. let mut curr = self.current_snapshot.0.lock();
  27. // The new snapshot can have a last_included_index that is smaller than
  28. // the current snapshot, if this instance is a follower and the leader
  29. // has installed a new snapshot on it.
  30. if curr.last_included_index < snapshot.last_included_index {
  31. *curr = snapshot;
  32. }
  33. self.current_snapshot.1.notify_one();
  34. }
  35. /// Wakes up the daemon and gives it a chance to request a new snapshot.
  36. pub(crate) fn trigger(&self) {
  37. match &self.unparker {
  38. Some(unparker) => unparker.unpark(),
  39. None => {}
  40. }
  41. }
  42. const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
  43. /// Notifies the daemon that the log has grown. It might be a good time to
  44. /// request a new snapshot.
  45. pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
  46. if last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL {
  47. self.trigger();
  48. }
  49. }
  50. /// Wakes up the daemon and asks it to shutdown. Does not wait for the
  51. /// daemon to fully exit. This method never fails or blocks forever.
  52. pub(crate) fn kill(&self) {
  53. self.trigger();
  54. // Acquire the lock to make sure the daemon thread either has been
  55. // waiting on the condition, or has not checked `keep_running` yet.
  56. let _guard = self.current_snapshot.0.lock();
  57. self.current_snapshot.1.notify_all();
  58. }
  59. }
  60. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  61. /// Saves the snapshot into a staging area before it is applied to the log.
  62. ///
  63. /// Does nothing if the snapshot has a lower index than any snapshot before.
  64. ///
  65. /// This method Will not block forever. It contains minimal logic so that it
  66. /// is safe to run on an application thread. There is no guarantee that the
  67. /// saved snapshot will be applied to the internal log.
  68. pub fn save_snapshot(&self, snapshot: Snapshot) {
  69. self.snapshot_daemon.save_snapshot(snapshot)
  70. }
  71. /// Runs a daemon that requests and handles application snapshot.
  72. ///
  73. /// A snapshot must be taken when the size of the persisted log exceeds the
  74. /// limit specified by `max_state_size`. The daemon also attempts to take
  75. /// the snapshot when there are more than 100 entries in the log.
  76. ///
  77. /// A snapshot is requested by calling `request_snapshot`. The callback
  78. /// accepts a parameter that specifies the minimal log index the new
  79. /// snapshot must contain. The callback should not block. The callback could
  80. /// be called again when a snapshot is being prepared. The callback can be
  81. /// called multiple times with the same minimal log index.
  82. ///
  83. /// A new snapshot is delivered by calling [`Raft::save_snapshot`]. The new
  84. /// snapshot will be saved in a temporary space. This daemon will wake up,
  85. /// apply the snapshot into the log and discard log entries before the
  86. /// snapshot. There is no guarantee that the snapshot will be applied.
  87. pub(crate) fn run_snapshot_daemon(
  88. &mut self,
  89. max_state_size: Option<usize>,
  90. mut request_snapshot: impl RequestSnapshotFnMut,
  91. ) {
  92. let max_state_size = match max_state_size {
  93. Some(max_state_size) => max_state_size,
  94. None => return,
  95. };
  96. let parker = Parker::new();
  97. let unparker = parker.unparker().clone();
  98. self.snapshot_daemon.unparker.replace(unparker.clone());
  99. let keep_running = self.keep_running.clone();
  100. let me = self.me;
  101. let rf = self.inner_state.clone();
  102. let persister = self.persister.clone();
  103. let snapshot_daemon = self.snapshot_daemon.clone();
  104. let daemon_env = self.daemon_env.clone();
  105. let stop_wait_group = self.stop_wait_group.clone();
  106. log::info!("{:?} snapshot daemon running ...", me);
  107. let join_handle = std::thread::spawn(move || loop {
  108. // Note: do not change this to `let _ = ...`.
  109. let _guard = daemon_env.for_scope();
  110. parker.park();
  111. if !keep_running.load(Ordering::SeqCst) {
  112. log::info!("{:?} snapshot daemon done.", me);
  113. // Explicitly drop every thing.
  114. drop(keep_running);
  115. drop(rf);
  116. drop(persister);
  117. drop(snapshot_daemon);
  118. drop(daemon_env);
  119. drop(stop_wait_group);
  120. break;
  121. }
  122. if persister.state_size() >= max_state_size {
  123. let log_start = rf.lock().log.first_index_term();
  124. let snapshot = {
  125. let mut snapshot =
  126. snapshot_daemon.current_snapshot.0.lock();
  127. if keep_running.load(Ordering::SeqCst)
  128. && snapshot.last_included_index <= log_start.index
  129. {
  130. request_snapshot(log_start.index + 1);
  131. snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
  132. }
  133. snapshot.clone()
  134. };
  135. let mut rf = rf.lock();
  136. if rf.log.first_index_term() != log_start {
  137. // Another snapshot was installed, let's try again.
  138. unparker.unpark();
  139. continue;
  140. }
  141. if snapshot.last_included_index <= rf.log.start() {
  142. // It seems the request_snapshot callback is misbehaving,
  143. // let's try again.
  144. unparker.unpark();
  145. continue;
  146. }
  147. check_or_record!(
  148. snapshot.last_included_index < rf.log.end(),
  149. ErrorKind::SnapshotAfterLogEnd(
  150. snapshot.last_included_index,
  151. ),
  152. "Snapshot contains data that is not in the log. \
  153. This could happen when logs shrinks.",
  154. &rf
  155. );
  156. check_or_record!(
  157. snapshot.last_included_index <= rf.commit_index,
  158. ErrorKind::SnapshotNotCommitted(
  159. snapshot.last_included_index
  160. ),
  161. "Snapshot contains data that is not committed. \
  162. This should never happen.",
  163. &rf
  164. );
  165. // SNAPSHOT_INDEX_INVARIANT: log.start() is shifted to
  166. // last_included_index. We checked that last_included_index is
  167. // smaller than commit_index. This is the only place where
  168. // log.start() changes.
  169. rf.log.shift(snapshot.last_included_index, snapshot.data);
  170. persister.save_snapshot_and_state(
  171. rf.persisted_state().into(),
  172. rf.log.snapshot().1,
  173. );
  174. }
  175. });
  176. self.daemon_env.watch_daemon(Daemon::Snapshot, join_handle);
  177. }
  178. }