snapshot.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use crossbeam_utils::sync::{Parker, Unparker};
  4. use parking_lot::{Condvar, Mutex};
  5. use crate::check_or_record;
  6. use crate::daemon_env::{Daemon, ErrorKind};
  7. use crate::{Index, Raft};
  8. #[derive(Clone, Debug, Default)]
  9. pub struct Snapshot {
  10. pub last_included_index: Index,
  11. pub data: Vec<u8>,
  12. }
  13. #[derive(Clone, Debug, Default)]
  14. pub(crate) struct SnapshotDaemon {
  15. unparker: Option<Unparker>,
  16. current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
  17. }
  18. pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
  19. impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
  20. impl SnapshotDaemon {
  21. /// Saves the snapshot into the staging area of the daemon, before it is
  22. /// applied to the log.
  23. ///
  24. /// Does nothing if the snapshot has a lower index than any snapshot before.
  25. pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
  26. let mut curr = self.current_snapshot.0.lock();
  27. // The new snapshot can have a last_included_index that is smaller than
  28. // the current snapshot, if this instance is a follower and the leader
  29. // has installed a new snapshot on it.
  30. if curr.last_included_index < snapshot.last_included_index {
  31. *curr = snapshot;
  32. }
  33. self.current_snapshot.1.notify_one();
  34. }
  35. /// Wakes up the daemon and gives it a chance to request a new snapshot.
  36. pub(crate) fn trigger(&self) {
  37. match &self.unparker {
  38. Some(unparker) => unparker.unpark(),
  39. None => {}
  40. }
  41. }
  42. const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
  43. /// Notifies the daemon that the log has grown. It might be a good time to
  44. /// request a new snapshot.
  45. // We cannot simply deliver snapshot requests as one type of commands in the
  46. // apply_command daemon. A snapshot should be requested when
  47. // 1. a new log entry is applied to the state machine, or
  48. // 2. when the log grow out of the limit.
  49. //
  50. // The first scenario fits naturally into the duties of apply_command. The
  51. // second one, i.e. log_grow(), does not. The apply_command daemon does not
  52. // get notified when the log grows. Waking up the daemon when the log grow
  53. // can be costly.
  54. //
  55. // Another option is to allow other daemons sending messages to the state
  56. // machine. That would require ApplyCommandFunc to be shared by two threads.
  57. // Adding more requirements to external interface is also something we would
  58. // rather not do.
  59. pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
  60. if last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL {
  61. self.trigger();
  62. }
  63. }
  64. /// Wakes up the daemon and asks it to shutdown. Does not wait for the
  65. /// daemon to fully exit. This method never fails or blocks forever.
  66. pub(crate) fn kill(&self) {
  67. self.trigger();
  68. // Acquire the lock to make sure the daemon thread either has been
  69. // waiting on the condition, or has not checked `keep_running` yet.
  70. let _guard = self.current_snapshot.0.lock();
  71. self.current_snapshot.1.notify_all();
  72. }
  73. }
  74. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  75. /// Saves the snapshot into a staging area before it is applied to the log.
  76. ///
  77. /// Does nothing if the snapshot has a lower index than any snapshot before.
  78. ///
  79. /// This method Will not block forever. It contains minimal logic so that it
  80. /// is safe to run on an application thread. There is no guarantee that the
  81. /// saved snapshot will be applied to the internal log.
  82. pub fn save_snapshot(&self, snapshot: Snapshot) {
  83. self.snapshot_daemon.save_snapshot(snapshot)
  84. }
  85. /// Runs a daemon that requests and handles application snapshot.
  86. ///
  87. /// A snapshot must be taken when the size of the persisted log exceeds the
  88. /// limit specified by `max_state_size`. The daemon also attempts to take
  89. /// the snapshot when there are more than 100 entries in the log.
  90. ///
  91. /// A snapshot is requested by calling `request_snapshot`. The callback
  92. /// accepts a parameter that specifies the minimal log index the new
  93. /// snapshot must contain. The callback should not block. The callback could
  94. /// be called again when a snapshot is being prepared. The callback can be
  95. /// called multiple times with the same minimal log index.
  96. ///
  97. /// A new snapshot is delivered by calling [`Raft::save_snapshot`]. The new
  98. /// snapshot will be saved in a temporary space. This daemon will wake up,
  99. /// apply the snapshot into the log and discard log entries before the
  100. /// snapshot. There is no guarantee that the snapshot will be applied.
  101. pub(crate) fn run_snapshot_daemon(
  102. &mut self,
  103. max_state_size: Option<usize>,
  104. mut request_snapshot: impl RequestSnapshotFnMut,
  105. ) {
  106. let max_state_size = match max_state_size {
  107. Some(max_state_size) => max_state_size,
  108. None => return,
  109. };
  110. let parker = Parker::new();
  111. let unparker = parker.unparker().clone();
  112. self.snapshot_daemon.unparker.replace(unparker.clone());
  113. let keep_running = self.keep_running.clone();
  114. let me = self.me;
  115. let rf = self.inner_state.clone();
  116. let persister = self.persister.clone();
  117. let snapshot_daemon = self.snapshot_daemon.clone();
  118. let daemon_env = self.daemon_env.clone();
  119. let stop_wait_group = self.stop_wait_group.clone();
  120. log::info!("{:?} snapshot daemon running ...", me);
  121. let join_handle = std::thread::spawn(move || loop {
  122. // Note: do not change this to `let _ = ...`.
  123. let _guard = daemon_env.for_scope();
  124. parker.park();
  125. if !keep_running.load(Ordering::SeqCst) {
  126. log::info!("{:?} snapshot daemon done.", me);
  127. // Explicitly drop every thing.
  128. drop(keep_running);
  129. drop(rf);
  130. drop(persister);
  131. drop(snapshot_daemon);
  132. drop(daemon_env);
  133. drop(stop_wait_group);
  134. break;
  135. }
  136. if persister.state_size() >= max_state_size {
  137. let log_start = rf.lock().log.first_index_term();
  138. let snapshot = {
  139. let mut snapshot =
  140. snapshot_daemon.current_snapshot.0.lock();
  141. if keep_running.load(Ordering::SeqCst)
  142. && snapshot.last_included_index <= log_start.index
  143. {
  144. request_snapshot(log_start.index + 1);
  145. snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
  146. }
  147. snapshot.clone()
  148. };
  149. let mut rf = rf.lock();
  150. if rf.log.first_index_term() != log_start {
  151. // Another snapshot was installed, let's try again.
  152. unparker.unpark();
  153. continue;
  154. }
  155. if snapshot.last_included_index <= rf.log.start() {
  156. // It seems the request_snapshot callback is misbehaving,
  157. // let's try again.
  158. unparker.unpark();
  159. continue;
  160. }
  161. check_or_record!(
  162. snapshot.last_included_index < rf.log.end(),
  163. ErrorKind::SnapshotAfterLogEnd(
  164. snapshot.last_included_index,
  165. ),
  166. "Snapshot contains data that is not in the log. \
  167. This could happen when logs shrinks.",
  168. &rf
  169. );
  170. check_or_record!(
  171. snapshot.last_included_index <= rf.commit_index,
  172. ErrorKind::SnapshotNotCommitted(
  173. snapshot.last_included_index
  174. ),
  175. "Snapshot contains data that is not committed. \
  176. This should never happen.",
  177. &rf
  178. );
  179. // SNAPSHOT_INDEX_INVARIANT: log.start() is shifted to
  180. // last_included_index. We checked that last_included_index is
  181. // smaller than commit_index. This is the only place where
  182. // log.start() changes.
  183. rf.log.shift(snapshot.last_included_index, snapshot.data);
  184. persister.save_snapshot_and_state(
  185. rf.persisted_state().into(),
  186. rf.log.snapshot().1,
  187. );
  188. }
  189. });
  190. self.daemon_env.watch_daemon(Daemon::Snapshot, join_handle);
  191. }
  192. }