snapshot.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. use std::sync::atomic::Ordering;
  2. use crossbeam_utils::sync::{Parker, Unparker};
  3. use crate::{Index, Raft};
  4. use parking_lot::{Condvar, Mutex};
  5. use std::sync::Arc;
  6. #[derive(Clone, Debug, Default)]
  7. pub struct Snapshot {
  8. pub last_included_index: Index,
  9. pub data: Vec<u8>,
  10. }
  11. #[derive(Clone, Debug, Default)]
  12. pub(crate) struct SnapshotDaemon {
  13. unparker: Option<Unparker>,
  14. current_snapshot: Arc<(Mutex<Snapshot>, Condvar)>,
  15. }
  16. pub trait RequestSnapshotFnMut: 'static + Send + FnMut(Index) {}
  17. impl<T: 'static + Send + FnMut(Index)> RequestSnapshotFnMut for T {}
  18. impl SnapshotDaemon {
  19. pub(crate) fn save_snapshot(&self, snapshot: Snapshot) {
  20. let mut curr = self.current_snapshot.0.lock();
  21. if curr.last_included_index < snapshot.last_included_index {
  22. *curr = snapshot;
  23. }
  24. self.current_snapshot.1.notify_one();
  25. }
  26. pub(crate) fn trigger(&self) {
  27. match &self.unparker {
  28. Some(unparker) => unparker.unpark(),
  29. None => {}
  30. }
  31. }
  32. const MIN_SNAPSHOT_INDEX_INTERVAL: usize = 100;
  33. pub(crate) fn log_grow(&self, first_index: Index, last_index: Index) {
  34. if last_index - first_index > Self::MIN_SNAPSHOT_INDEX_INTERVAL {
  35. self.trigger();
  36. }
  37. }
  38. pub(crate) fn kill(&self) {
  39. self.trigger();
  40. // Acquire the lock to make sure the daemon thread either has been
  41. // waiting on the condition, or has not checked `keep_running` yet.
  42. let _ = self.current_snapshot.0.lock();
  43. self.current_snapshot.1.notify_all();
  44. }
  45. }
  46. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  47. pub fn save_snapshot(&self, snapshot: Snapshot) {
  48. self.snapshot_daemon.save_snapshot(snapshot)
  49. }
  50. pub(crate) fn run_snapshot_daemon(
  51. &mut self,
  52. max_state_size: Option<usize>,
  53. mut request_snapshot: impl RequestSnapshotFnMut,
  54. ) {
  55. let max_state_size = match max_state_size {
  56. Some(max_state_size) => max_state_size,
  57. None => return,
  58. };
  59. let parker = Parker::new();
  60. let unparker = parker.unparker().clone();
  61. self.snapshot_daemon.unparker.replace(unparker.clone());
  62. let keep_running = self.keep_running.clone();
  63. let rf = self.inner_state.clone();
  64. let persister = self.persister.clone();
  65. let snapshot_daemon = self.snapshot_daemon.clone();
  66. let stop_wait_group = self.stop_wait_group.clone();
  67. std::thread::spawn(move || loop {
  68. parker.park();
  69. if !keep_running.load(Ordering::SeqCst) {
  70. // Explicitly drop every thing.
  71. drop(keep_running);
  72. drop(rf);
  73. drop(persister);
  74. drop(snapshot_daemon);
  75. drop(stop_wait_group);
  76. break;
  77. }
  78. if persister.state_size() >= max_state_size {
  79. let log_start = rf.lock().log.first_index_term();
  80. let snapshot = {
  81. let mut snapshot =
  82. snapshot_daemon.current_snapshot.0.lock();
  83. if keep_running.load(Ordering::SeqCst)
  84. && snapshot.last_included_index <= log_start.index
  85. {
  86. request_snapshot(log_start.index + 1);
  87. snapshot_daemon.current_snapshot.1.wait(&mut snapshot);
  88. }
  89. snapshot.clone()
  90. };
  91. let mut rf = rf.lock();
  92. if rf.log.first_index_term() != log_start {
  93. // Another snapshot was installed, let's try again.
  94. unparker.unpark();
  95. continue;
  96. }
  97. if snapshot.last_included_index <= rf.log.start() {
  98. // It seems the request_snapshot callback is misbehaving,
  99. // let's try again.
  100. unparker.unpark();
  101. continue;
  102. }
  103. if snapshot.last_included_index >= rf.log.end() {
  104. // We recently rolled back some of the committed logs. This
  105. // can happen but usually the same exact log entries will be
  106. // installed in the next AppendEntries request.
  107. // There is no need to retry, because when the log entries
  108. // are re-committed, we will be notified again.
  109. // We will not be notified when the log length changes. Thus
  110. // when the log length grows to passing last_included_index
  111. // the first time, no snapshot will be taken, although
  112. // nothing is preventing it to be done. We will wait until
  113. // at least one more entry is committed.
  114. continue;
  115. }
  116. rf.log.shift(snapshot.last_included_index, snapshot.data);
  117. persister.save_snapshot_and_state(
  118. rf.persisted_state().into(),
  119. rf.log.snapshot().1,
  120. );
  121. }
  122. });
  123. }
  124. }