snapshot.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use crossbeam_utils::sync::{Parker, Unparker};
  4. use parking_lot::{Condvar, Mutex};
  5. use crate::check_or_record;
  6. use crate::daemon_env::{Daemon, ErrorKind};
  7. use crate::{Index, Raft};
  8. #[derive(Clone, Debug, Default)]
  9. pub struct Snapshot {
  10. pub last_included_index: Index,
  11. pub data: Vec<u8>,
  12. }
  13. #[derive(Clone, Debug)]
  14. pub(crate) struct SnapshotDaemon {
  15. unparker: Option<Unparker>,
  16. current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
  17. }
  18. pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
  19. impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
  20. impl SnapshotDaemon {
  21. /// Create a new snapshot daemon.
  22. pub fn create() -> Self {
  23. Self {
  24. unparker: None,
  25. current_snapshot: Default::default(),
  26. }
  27. }
  28. /// Saves the snapshot into the staging area of the daemon, before it is
  29. /// applied to the log.
  30. ///
  31. /// Does nothing if the snapshot has a lower index than any snapshot before.
  32. pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
  33. let mut curr = self.current_snapshot.0.lock();
  34. // The new snapshot can have a last_included_index that is smaller than
  35. // the current snapshot, if this instance is a follower and the leader
  36. // has installed a new snapshot on it.
  37. // Note some stale snapshots might still fall through the crack, if the
  38. // current snapshot has been taken by the snapshot daemon thread,
  39. // leaving a default value in curr. In that scenario,
  40. // last_included_index is zero, and any snapshots are allowed.
  41. if curr.last_included_index < snapshot.last_included_index {
  42. *curr = snapshot;
  43. }
  44. self.current_snapshot.1.notify_one();
  45. }
  46. /// Wakes up the daemon and gives it a chance to request a new snapshot.
  47. pub(crate) fn trigger(&self) {
  48. match &self.unparker {
  49. Some(unparker) => unparker.unpark(),
  50. None => {}
  51. }
  52. }
  53. const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
  54. /// Notifies the daemon that the log has grown. It might be a good time to
  55. /// request a new snapshot.
  56. // We cannot simply deliver snapshot requests as one type of commands in the
  57. // apply_command daemon. A snapshot should be requested when
  58. // 1. a new log entry is applied to the state machine, or
  59. // 2. when the log grow out of the limit.
  60. //
  61. // The first scenario fits naturally into the duties of apply_command. The
  62. // second one, i.e. log_grow(), does not. The apply_command daemon does not
  63. // get notified when the log grows. Waking up the daemon when the log grow
  64. // can be costly.
  65. //
  66. // Another option is to allow other daemons sending messages to the state
  67. // machine. That would require ApplyCommandFunc to be shared by two threads.
  68. // Adding more requirements to external interface is also something we would
  69. // rather not do.
  70. pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
  71. if last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL {
  72. self.trigger();
  73. }
  74. }
  75. /// Wakes up the daemon and asks it to shutdown. Does not wait for the
  76. /// daemon to fully exit. This method never fails or blocks forever.
  77. pub(crate) fn kill(&self) {
  78. self.trigger();
  79. // Acquire the lock to make sure the daemon thread either has been
  80. // waiting on the condition, or has not checked `keep_running` yet.
  81. let _guard = self.current_snapshot.0.lock();
  82. self.current_snapshot.1.notify_all();
  83. }
  84. }
  85. impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
  86. /// Saves the snapshot into a staging area before it is applied to the log.
  87. ///
  88. /// Does nothing if the snapshot has a lower index than any snapshot before.
  89. ///
  90. /// This method Will not block forever. It contains minimal logic so that it
  91. /// is safe to run on an application thread. There is no guarantee that the
  92. /// saved snapshot will be applied to the internal log.
  93. pub fn save_snapshot(&self, snapshot: Snapshot) {
  94. self.snapshot_daemon.save_snapshot(snapshot)
  95. }
  96. /// Runs a daemon that requests and handles application snapshot.
  97. ///
  98. /// A snapshot must be taken when the size of the persisted log exceeds the
  99. /// limit specified by `max_state_size`. The daemon also attempts to take
  100. /// the snapshot when there are more than 100 entries in the log.
  101. ///
  102. /// A snapshot is requested by calling `request_snapshot`. The callback
  103. /// accepts a parameter that specifies the minimal log index the new
  104. /// snapshot must contain. The callback should not block. The callback could
  105. /// be called again when a snapshot is being prepared. The callback can be
  106. /// called multiple times with the same minimal log index.
  107. ///
  108. /// A new snapshot is delivered by calling [`Raft::save_snapshot`]. The new
  109. /// snapshot will be saved in a temporary space. This daemon will wake up,
  110. /// apply the snapshot into the log and discard log entries before the
  111. /// snapshot. There is no guarantee that the snapshot will be applied.
  112. pub(crate) fn run_snapshot_daemon(
  113. &mut self,
  114. max_state_size: Option<usize>,
  115. mut request_snapshot: impl RequestSnapshotFnMut,
  116. ) {
  117. let max_state_size = match max_state_size {
  118. Some(max_state_size) => max_state_size,
  119. None => return,
  120. };
  121. let parker = Parker::new();
  122. let unparker = parker.unparker().clone();
  123. self.snapshot_daemon.unparker.replace(unparker.clone());
  124. let keep_running = self.keep_running.clone();
  125. let me = self.me;
  126. let rf = self.inner_state.clone();
  127. let persister = self.persister.clone();
  128. let snapshot_daemon = self.snapshot_daemon.clone();
  129. let stop_wait_group = self.stop_wait_group.clone();
  130. log::info!("{:?} snapshot daemon running ...", me);
  131. let snapshot_daemon = move || loop {
  132. parker.park();
  133. if !keep_running.load(Ordering::SeqCst) {
  134. log::info!("{:?} snapshot daemon done.", me);
  135. // Explicitly drop every thing.
  136. drop(keep_running);
  137. drop(rf);
  138. drop(persister);
  139. drop(snapshot_daemon);
  140. drop(stop_wait_group);
  141. break;
  142. }
  143. if persister.state_size() >= max_state_size {
  144. let log_start = rf.lock().log.first_index_term();
  145. let snapshot = {
  146. let mut snapshot =
  147. snapshot_daemon.current_snapshot.0.lock();
  148. if keep_running.load(Ordering::SeqCst)
  149. && snapshot.last_included_index <= log_start.index
  150. {
  151. request_snapshot(log_start.index + 1);
  152. snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
  153. }
  154. // Take the snapshot and set the index to zero in its place.
  155. // This defeats the stale snapshot protection implemented in
  156. // `save_snapshot()`. Stale snapshots are tolerated below.
  157. use std::ops::DerefMut;
  158. std::mem::take(snapshot.deref_mut())
  159. };
  160. let mut rf = rf.lock();
  161. if rf.log.first_index_term() != log_start {
  162. // Another snapshot was installed, let's try again.
  163. unparker.unpark();
  164. continue;
  165. }
  166. if snapshot.last_included_index <= rf.log.start() {
  167. // It seems the request_snapshot callback is misbehaving,
  168. // let's try again.
  169. unparker.unpark();
  170. continue;
  171. }
  172. check_or_record!(
  173. snapshot.last_included_index < rf.log.end(),
  174. ErrorKind::SnapshotAfterLogEnd(
  175. snapshot.last_included_index,
  176. ),
  177. "Snapshot contains data that is not in the log. \
  178. This could happen when logs shrinks.",
  179. &rf
  180. );
  181. check_or_record!(
  182. snapshot.last_included_index <= rf.commit_index,
  183. ErrorKind::SnapshotNotCommitted(
  184. snapshot.last_included_index
  185. ),
  186. "Snapshot contains data that is not committed. \
  187. This should never happen.",
  188. &rf
  189. );
  190. // SNAPSHOT_INDEX_INVARIANT: log.start() is shifted to
  191. // last_included_index. We checked that last_included_index is
  192. // smaller than commit_index. This is the only place where
  193. // log.start() changes.
  194. rf.log.shift(snapshot.last_included_index, snapshot.data);
  195. persister.save_snapshot_and_state(
  196. rf.persisted_state().into(),
  197. rf.log.snapshot().1,
  198. );
  199. }
  200. };
  201. self.daemon_env
  202. .watch_daemon(Daemon::Snapshot, snapshot_daemon);
  203. }
  204. }