snapshot.rs 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. use crate::{Index, Raft};
  2. use crossbeam_utils::sync::{Parker, Unparker};
  3. use std::sync::atomic::Ordering;
  4. pub struct Snapshot {
  5. pub last_included_index: Index,
  6. pub data: Vec<u8>,
  7. }
  8. #[derive(Clone, Debug, Default)]
  9. pub(crate) struct SnapshotDaemon {
  10. unparker: Option<Unparker>,
  11. }
  12. pub trait RequestSnapshotFnMut:
  13. 'static + Send + FnMut(Index) -> Snapshot
  14. {
  15. }
  16. impl<T: 'static + Send + FnMut(Index) -> Snapshot> RequestSnapshotFnMut for T {}
  17. impl SnapshotDaemon {
  18. pub(crate) fn trigger(&self) {
  19. match &self.unparker {
  20. Some(unparker) => unparker.unpark(),
  21. None => {}
  22. }
  23. }
  24. }
  25. impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
  26. pub(crate) fn run_snapshot_daemon(
  27. &mut self,
  28. max_state_size: Option<usize>,
  29. mut request_snapshot: impl RequestSnapshotFnMut,
  30. ) {
  31. let max_state_size = match max_state_size {
  32. Some(max_state_size) => max_state_size,
  33. None => return,
  34. };
  35. let parker = Parker::new();
  36. let unparker = parker.unparker().clone();
  37. self.snapshot_daemon.unparker.replace(unparker.clone());
  38. let keep_running = self.keep_running.clone();
  39. let rf = self.inner_state.clone();
  40. let persister = self.persister.clone();
  41. let stop_wait_group = self.stop_wait_group.clone();
  42. std::thread::spawn(move || loop {
  43. parker.park();
  44. if !keep_running.load(Ordering::SeqCst) {
  45. drop(stop_wait_group);
  46. break;
  47. }
  48. if persister.state_size() >= max_state_size {
  49. let log_start = rf.lock().log.first_index_term();
  50. let snapshot = request_snapshot(log_start.index + 1);
  51. let mut rf = rf.lock();
  52. if rf.log.first_index_term() != log_start {
  53. // Another snapshot was installed, let's try again.
  54. unparker.unpark();
  55. continue;
  56. }
  57. if snapshot.last_included_index <= rf.log.start() {
  58. // It seems the request_snapshot callback is misbehaving,
  59. // let's try again.
  60. unparker.unpark();
  61. continue;
  62. }
  63. if snapshot.last_included_index >= rf.log.end() {
  64. // We recently rolled back some of the committed logs. This
  65. // can happen but usually the same exact log entries will be
  66. // installed in the next AppendEntries request.
  67. // There is no need to retry, because when the log entries
  68. // are re-committed, we will be notified again.
  69. continue;
  70. }
  71. rf.log.shift(snapshot.last_included_index, snapshot.data);
  72. persister.save_snapshot_and_state(
  73. rf.persisted_state().into(),
  74. rf.log.snapshot().1,
  75. );
  76. }
  77. });
  78. }
  79. }