snapshot.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. use crate::{Index, Raft};
  2. use crossbeam_utils::sync::{Parker, Unparker};
  3. use std::sync::atomic::Ordering;
  4. #[derive(Clone, Debug, Default)]
  5. pub struct Snapshot {
  6. pub last_included_index: Index,
  7. pub data: Vec<u8>,
  8. }
  9. #[derive(Clone, Debug, Default)]
  10. pub(crate) struct SnapshotDaemon {
  11. unparker: Option<Unparker>,
  12. }
  13. pub trait RequestSnapshotFnMut:
  14. 'static + Send + FnMut(Index) -> Snapshot
  15. {
  16. }
  17. impl<T: 'static + Send + FnMut(Index) -> Snapshot> RequestSnapshotFnMut for T {}
  18. impl SnapshotDaemon {
  19. pub(crate) fn trigger(&self) {
  20. match &self.unparker {
  21. Some(unparker) => unparker.unpark(),
  22. None => {}
  23. }
  24. }
  25. }
  26. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  27. pub(crate) fn run_snapshot_daemon(
  28. &mut self,
  29. max_state_size: Option<usize>,
  30. mut request_snapshot: impl RequestSnapshotFnMut,
  31. ) {
  32. let max_state_size = match max_state_size {
  33. Some(max_state_size) => max_state_size,
  34. None => return,
  35. };
  36. let parker = Parker::new();
  37. let unparker = parker.unparker().clone();
  38. self.snapshot_daemon.unparker.replace(unparker.clone());
  39. let keep_running = self.keep_running.clone();
  40. let rf = self.inner_state.clone();
  41. let persister = self.persister.clone();
  42. let stop_wait_group = self.stop_wait_group.clone();
  43. std::thread::spawn(move || loop {
  44. parker.park();
  45. if !keep_running.load(Ordering::SeqCst) {
  46. // Explicitly drop every thing.
  47. drop(keep_running);
  48. drop(rf);
  49. drop(persister);
  50. drop(stop_wait_group);
  51. break;
  52. }
  53. if persister.state_size() >= max_state_size {
  54. let log_start = rf.lock().log.first_index_term();
  55. let snapshot = request_snapshot(log_start.index + 1);
  56. let mut rf = rf.lock();
  57. if rf.log.first_index_term() != log_start {
  58. // Another snapshot was installed, let's try again.
  59. unparker.unpark();
  60. continue;
  61. }
  62. if snapshot.last_included_index <= rf.log.start() {
  63. // It seems the request_snapshot callback is misbehaving,
  64. // let's try again.
  65. unparker.unpark();
  66. continue;
  67. }
  68. if snapshot.last_included_index >= rf.log.end() {
  69. // We recently rolled back some of the committed logs. This
  70. // can happen but usually the same exact log entries will be
  71. // installed in the next AppendEntries request.
  72. // There is no need to retry, because when the log entries
  73. // are re-committed, we will be notified again.
  74. // We will not be notified when the log length changes. Thus
  75. // when the log length grows to passing last_included_index
  76. // the first time, no snapshot will be taken, although
  77. // nothing is preventing it to be done. We will wait until
  78. // at least one more entry is committed.
  79. continue;
  80. }
  81. rf.log.shift(snapshot.last_included_index, snapshot.data);
  82. persister.save_snapshot_and_state(
  83. rf.persisted_state().into(),
  84. rf.log.snapshot().1,
  85. );
  86. }
  87. });
  88. }
  89. }