snapshot.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use crossbeam_utils::sync::{Parker, Unparker};
  4. use parking_lot::{Condvar, Mutex};
  5. use crate::check_or_record;
  6. use crate::daemon_env::{Daemon, ErrorKind};
  7. use crate::{Index, Raft};
  8. #[derive(Clone, Debug, Default)]
  9. pub struct Snapshot {
  10. pub last_included_index: Index,
  11. pub data: Vec<u8>,
  12. }
  13. #[derive(Clone, Debug, Default)]
  14. pub(crate) struct SnapshotDaemon {
  15. unparker: Option<Unparker>,
  16. current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
  17. }
  18. pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
  19. impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
  20. impl SnapshotDaemon {
  21. /// Saves the snapshot into the staging area of the daemon, before it is
  22. /// applied to the log.
  23. ///
  24. /// Does nothing if the snapshot has a lower index than any snapshot before.
  25. pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
  26. let mut curr = self.current_snapshot.0.lock();
  27. // The new snapshot can have a last_included_index that is smaller than
  28. // the current snapshot, if this instance is a follower and the leader
  29. // has installed a new snapshot on it.
  30. // Note some stale snapshots might still fall through the crack, if the
  31. // current snapshot has been taken by the snapshot daemon thread,
  32. // leaving a default value in curr. In that scenario,
  33. // last_included_index is zero, and any snapshots are allowed.
  34. if curr.last_included_index < snapshot.last_included_index {
  35. *curr = snapshot;
  36. }
  37. self.current_snapshot.1.notify_one();
  38. }
  39. /// Wakes up the daemon and gives it a chance to request a new snapshot.
  40. pub(crate) fn trigger(&self) {
  41. match &self.unparker {
  42. Some(unparker) => unparker.unpark(),
  43. None => {}
  44. }
  45. }
  46. const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
  47. /// Notifies the daemon that the log has grown. It might be a good time to
  48. /// request a new snapshot.
  49. // We cannot simply deliver snapshot requests as one type of commands in the
  50. // apply_command daemon. A snapshot should be requested when
  51. // 1. a new log entry is applied to the state machine, or
  52. // 2. when the log grow out of the limit.
  53. //
  54. // The first scenario fits naturally into the duties of apply_command. The
  55. // second one, i.e. log_grow(), does not. The apply_command daemon does not
  56. // get notified when the log grows. Waking up the daemon when the log grow
  57. // can be costly.
  58. //
  59. // Another option is to allow other daemons sending messages to the state
  60. // machine. That would require ApplyCommandFunc to be shared by two threads.
  61. // Adding more requirements to external interface is also something we would
  62. // rather not do.
  63. pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
  64. if last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL {
  65. self.trigger();
  66. }
  67. }
  68. /// Wakes up the daemon and asks it to shutdown. Does not wait for the
  69. /// daemon to fully exit. This method never fails or blocks forever.
  70. pub(crate) fn kill(&self) {
  71. self.trigger();
  72. // Acquire the lock to make sure the daemon thread either has been
  73. // waiting on the condition, or has not checked `keep_running` yet.
  74. let _guard = self.current_snapshot.0.lock();
  75. self.current_snapshot.1.notify_all();
  76. }
  77. }
  78. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  79. /// Saves the snapshot into a staging area before it is applied to the log.
  80. ///
  81. /// Does nothing if the snapshot has a lower index than any snapshot before.
  82. ///
  83. /// This method Will not block forever. It contains minimal logic so that it
  84. /// is safe to run on an application thread. There is no guarantee that the
  85. /// saved snapshot will be applied to the internal log.
  86. pub fn save_snapshot(&self, snapshot: Snapshot) {
  87. self.snapshot_daemon.save_snapshot(snapshot)
  88. }
  89. /// Runs a daemon that requests and handles application snapshot.
  90. ///
  91. /// A snapshot must be taken when the size of the persisted log exceeds the
  92. /// limit specified by `max_state_size`. The daemon also attempts to take
  93. /// the snapshot when there are more than 100 entries in the log.
  94. ///
  95. /// A snapshot is requested by calling `request_snapshot`. The callback
  96. /// accepts a parameter that specifies the minimal log index the new
  97. /// snapshot must contain. The callback should not block. The callback could
  98. /// be called again when a snapshot is being prepared. The callback can be
  99. /// called multiple times with the same minimal log index.
  100. ///
  101. /// A new snapshot is delivered by calling [`Raft::save_snapshot`]. The new
  102. /// snapshot will be saved in a temporary space. This daemon will wake up,
  103. /// apply the snapshot into the log and discard log entries before the
  104. /// snapshot. There is no guarantee that the snapshot will be applied.
  105. pub(crate) fn run_snapshot_daemon(
  106. &mut self,
  107. max_state_size: Option<usize>,
  108. mut request_snapshot: impl RequestSnapshotFnMut,
  109. ) {
  110. let max_state_size = match max_state_size {
  111. Some(max_state_size) => max_state_size,
  112. None => return,
  113. };
  114. let parker = Parker::new();
  115. let unparker = parker.unparker().clone();
  116. self.snapshot_daemon.unparker.replace(unparker.clone());
  117. let keep_running = self.keep_running.clone();
  118. let me = self.me;
  119. let rf = self.inner_state.clone();
  120. let persister = self.persister.clone();
  121. let snapshot_daemon = self.snapshot_daemon.clone();
  122. let daemon_env = self.daemon_env.clone();
  123. let stop_wait_group = self.stop_wait_group.clone();
  124. log::info!("{:?} snapshot daemon running ...", me);
  125. let join_handle = std::thread::spawn(move || loop {
  126. // Note: do not change this to `let _ = ...`.
  127. let _guard = daemon_env.for_scope();
  128. parker.park();
  129. if !keep_running.load(Ordering::SeqCst) {
  130. log::info!("{:?} snapshot daemon done.", me);
  131. // Explicitly drop every thing.
  132. drop(keep_running);
  133. drop(rf);
  134. drop(persister);
  135. drop(snapshot_daemon);
  136. drop(daemon_env);
  137. drop(stop_wait_group);
  138. break;
  139. }
  140. if persister.state_size() >= max_state_size {
  141. let log_start = rf.lock().log.first_index_term();
  142. let snapshot = {
  143. let mut snapshot =
  144. snapshot_daemon.current_snapshot.0.lock();
  145. if keep_running.load(Ordering::SeqCst)
  146. && snapshot.last_included_index <= log_start.index
  147. {
  148. request_snapshot(log_start.index + 1);
  149. snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
  150. }
  151. // Take the snapshot and set the index to zero in its place.
  152. // This defeats the stale snapshot protection implemented in
  153. // `save_snapshot()`. Stale snapshots are tolerated below.
  154. use std::ops::DerefMut;
  155. std::mem::take(snapshot.deref_mut())
  156. };
  157. let mut rf = rf.lock();
  158. if rf.log.first_index_term() != log_start {
  159. // Another snapshot was installed, let's try again.
  160. unparker.unpark();
  161. continue;
  162. }
  163. if snapshot.last_included_index <= rf.log.start() {
  164. // It seems the request_snapshot callback is misbehaving,
  165. // let's try again.
  166. unparker.unpark();
  167. continue;
  168. }
  169. check_or_record!(
  170. snapshot.last_included_index < rf.log.end(),
  171. ErrorKind::SnapshotAfterLogEnd(
  172. snapshot.last_included_index,
  173. ),
  174. "Snapshot contains data that is not in the log. \
  175. This could happen when logs shrinks.",
  176. &rf
  177. );
  178. check_or_record!(
  179. snapshot.last_included_index <= rf.commit_index,
  180. ErrorKind::SnapshotNotCommitted(
  181. snapshot.last_included_index
  182. ),
  183. "Snapshot contains data that is not committed. \
  184. This should never happen.",
  185. &rf
  186. );
  187. // SNAPSHOT_INDEX_INVARIANT: log.start() is shifted to
  188. // last_included_index. We checked that last_included_index is
  189. // smaller than commit_index. This is the only place where
  190. // log.start() changes.
  191. rf.log.shift(snapshot.last_included_index, snapshot.data);
  192. persister.save_snapshot_and_state(
  193. rf.persisted_state().into(),
  194. rf.log.snapshot().1,
  195. );
  196. }
  197. });
  198. self.daemon_env.watch_daemon(Daemon::Snapshot, join_handle);
  199. }
  200. }