install_snapshot.rs 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. use crate::index_term::IndexTerm;
  2. use crate::utils::retry_rpc;
  3. use crate::{
  4. Index, Peer, Raft, RaftState, RpcClient, State, SyncLogEntryResult, Term,
  5. RPC_DEADLINE,
  6. };
  7. #[derive(Clone, Debug, Serialize, Deserialize)]
  8. pub(crate) struct InstallSnapshotArgs {
  9. pub(crate) term: Term,
  10. leader_id: Peer,
  11. pub(crate) last_included_index: Index,
  12. last_included_term: Term,
  13. // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
  14. data: Vec<u8>,
  15. offset: usize,
  16. done: bool,
  17. }
  18. #[derive(Clone, Debug, Serialize, Deserialize)]
  19. pub(crate) struct InstallSnapshotReply {
  20. term: Term,
  21. committed: Option<IndexTerm>,
  22. }
  23. impl<C: Clone + Default + serde::Serialize> Raft<C> {
  24. pub(crate) fn process_install_snapshot(
  25. &self,
  26. args: InstallSnapshotArgs,
  27. ) -> InstallSnapshotReply {
  28. if args.offset != 0 || !args.done {
  29. panic!("Current implementation cannot handle segmented snapshots.")
  30. }
  31. let mut rf = self.inner_state.lock();
  32. if rf.current_term > args.term {
  33. return InstallSnapshotReply {
  34. term: rf.current_term,
  35. committed: None,
  36. };
  37. }
  38. if rf.current_term < args.term {
  39. rf.current_term = args.term;
  40. rf.voted_for = None;
  41. self.persister.save_state(rf.persisted_state().into());
  42. }
  43. rf.state = State::Follower;
  44. rf.leader_id = args.leader_id;
  45. self.election.reset_election_timer();
  46. // The above code is exactly the same as AppendEntries.
  47. // The snapshot could not be verified because the index is beyond log
  48. // start. Fail this request and ask leader to send something that we
  49. // could verify. We cannot rollback to a point beyond commit index
  50. // anyway. Otherwise if the system fails right after the rollback,
  51. // committed entries before log start would be lost forever.
  52. //
  53. // The commit index is sent back to leader. The leader would never need
  54. // to rollback beyond that, since it is guaranteed that committed log
  55. // entries will never be rolled back.
  56. if args.last_included_index < rf.log.start() {
  57. return InstallSnapshotReply {
  58. term: args.term,
  59. committed: Some(rf.log.first_after(rf.commit_index).into()),
  60. };
  61. }
  62. if args.last_included_index < rf.log.end()
  63. && args.last_included_index >= rf.log.start()
  64. && args.last_included_term == rf.log[args.last_included_index].term
  65. {
  66. // Do nothing if the index and term match the current snapshot.
  67. if args.last_included_index != rf.log.start() {
  68. if rf.commit_index < args.last_included_index {
  69. rf.commit_index = args.last_included_index;
  70. }
  71. rf.log.shift(args.last_included_index, args.data);
  72. }
  73. } else {
  74. assert!(args.last_included_index > rf.commit_index);
  75. rf.commit_index = args.last_included_index;
  76. rf.log.reset(
  77. args.last_included_index,
  78. args.last_included_term,
  79. args.data,
  80. );
  81. }
  82. self.persister.save_snapshot_and_state(
  83. rf.persisted_state().into(),
  84. rf.log.snapshot().1,
  85. );
  86. self.apply_command_signal.notify_one();
  87. InstallSnapshotReply {
  88. term: args.term,
  89. committed: None,
  90. }
  91. }
  92. pub(crate) fn build_install_snapshot(
  93. rf: &RaftState<C>,
  94. ) -> InstallSnapshotArgs {
  95. let (last, snapshot) = rf.log.snapshot();
  96. InstallSnapshotArgs {
  97. term: rf.current_term,
  98. leader_id: rf.leader_id,
  99. last_included_index: last.index,
  100. last_included_term: last.term,
  101. data: snapshot.to_owned(),
  102. offset: 0,
  103. done: true,
  104. }
  105. }
  106. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  107. pub(crate) async fn send_install_snapshot(
  108. rpc_client: &RpcClient,
  109. args: InstallSnapshotArgs,
  110. ) -> std::io::Result<SyncLogEntryResult> {
  111. let term = args.term;
  112. let reply = retry_rpc(
  113. Self::INSTALL_SNAPSHOT_RETRY,
  114. RPC_DEADLINE,
  115. move |_round| rpc_client.call_install_snapshot(args.clone()),
  116. )
  117. .await?;
  118. Ok(if reply.term == term {
  119. if let Some(committed) = reply.committed {
  120. SyncLogEntryResult::Archived(committed)
  121. } else {
  122. SyncLogEntryResult::Success
  123. }
  124. } else {
  125. SyncLogEntryResult::TermElapsed(reply.term)
  126. })
  127. }
  128. }