install_snapshot.rs 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. use crate::{Index, Peer, Raft, RaftState, State, Term};
  2. use parking_lot::Mutex;
  3. #[derive(Clone, Debug, Serialize, Deserialize)]
  4. pub(crate) struct InstallSnapshotArgs {
  5. term: Term,
  6. leader_id: Peer,
  7. last_included_index: Index,
  8. last_included_term: Term,
  9. // TODO(ditsing): this seems less efficient.
  10. data: Vec<u8>,
  11. offset: usize,
  12. done: bool,
  13. }
  14. pub(crate) struct InstallSnapshotReply {
  15. term: Term,
  16. }
  17. impl<C: Clone + Default + serde::Serialize> Raft<C> {
  18. pub(crate) fn process_install_snapshot(
  19. &self,
  20. args: InstallSnapshotArgs,
  21. ) -> InstallSnapshotReply {
  22. if args.offset != 0 || !args.done {
  23. panic!("Current implementation cannot handle segmented snapshots.")
  24. }
  25. let mut rf = self.inner_state.lock();
  26. if rf.current_term > args.term {
  27. return InstallSnapshotReply {
  28. term: rf.current_term,
  29. };
  30. }
  31. if rf.current_term < args.term {
  32. rf.current_term = args.term;
  33. rf.voted_for = None;
  34. self.persister.save_state(rf.persisted_state().into());
  35. }
  36. rf.state = State::Follower;
  37. rf.leader_id = args.leader_id;
  38. self.election.reset_election_timer();
  39. // The above code is exactly the same as AppendEntries.
  40. if args.last_included_index < rf.log.end()
  41. && args.last_included_index >= rf.log.start()
  42. && args.last_included_term == rf.log[args.last_included_index].term
  43. {
  44. rf.log.shift(args.last_included_index, args.data);
  45. } else {
  46. rf.log.reset(
  47. args.last_included_index,
  48. args.last_included_term,
  49. args.data,
  50. );
  51. }
  52. // The length of the log might shrink.
  53. let last_log_index = rf.log.last_index_term().index;
  54. if rf.commit_index > last_log_index {
  55. rf.commit_index = last_log_index;
  56. }
  57. self.persister.save_state(bytes::Bytes::new()); // TODO(ditsing)
  58. self.apply_command_signal.notify_one();
  59. InstallSnapshotReply { term: args.term }
  60. }
  61. fn build_install_snapshot(
  62. rf: &Mutex<RaftState<C>>,
  63. ) -> Option<InstallSnapshotArgs> {
  64. let rf = rf.lock();
  65. if !rf.is_leader() {
  66. return None;
  67. }
  68. let (last, snapshot) = rf.log.snapshot();
  69. Some(InstallSnapshotArgs {
  70. term: rf.current_term,
  71. leader_id: rf.leader_id,
  72. last_included_index: last.index,
  73. last_included_term: last.term,
  74. data: snapshot.to_owned(),
  75. offset: 0,
  76. done: true,
  77. })
  78. }
  79. }