install_snapshot.rs 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. use crate::utils::retry_rpc;
  2. use crate::{
  3. Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
  4. };
  5. #[derive(Clone, Debug, Serialize, Deserialize)]
  6. pub(crate) struct InstallSnapshotArgs {
  7. pub(crate) term: Term,
  8. leader_id: Peer,
  9. pub(crate) last_included_index: Index,
  10. last_included_term: Term,
  11. // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
  12. data: Vec<u8>,
  13. offset: usize,
  14. done: bool,
  15. }
  16. #[derive(Clone, Debug, Serialize, Deserialize)]
  17. pub(crate) struct InstallSnapshotReply {
  18. term: Term,
  19. }
  20. impl<C: Clone + Default + serde::Serialize> Raft<C> {
  21. pub(crate) fn process_install_snapshot(
  22. &self,
  23. args: InstallSnapshotArgs,
  24. ) -> InstallSnapshotReply {
  25. if args.offset != 0 || !args.done {
  26. panic!("Current implementation cannot handle segmented snapshots.")
  27. }
  28. let mut rf = self.inner_state.lock();
  29. if rf.current_term > args.term {
  30. return InstallSnapshotReply {
  31. term: rf.current_term,
  32. };
  33. }
  34. if rf.current_term < args.term {
  35. rf.current_term = args.term;
  36. rf.voted_for = None;
  37. self.persister.save_state(rf.persisted_state().into());
  38. }
  39. rf.state = State::Follower;
  40. rf.leader_id = args.leader_id;
  41. self.election.reset_election_timer();
  42. // The above code is exactly the same as AppendEntries.
  43. if args.last_included_index < rf.log.end()
  44. && args.last_included_index >= rf.log.start()
  45. && args.last_included_term == rf.log[args.last_included_index].term
  46. {
  47. rf.log.shift(args.last_included_index, args.data);
  48. } else {
  49. rf.log.reset(
  50. args.last_included_index,
  51. args.last_included_term,
  52. args.data,
  53. );
  54. }
  55. // The length of the log might shrink.
  56. let last_log_index = rf.log.last_index_term().index;
  57. if rf.commit_index > last_log_index {
  58. rf.commit_index = last_log_index;
  59. }
  60. self.persister.save_snapshot_and_state(
  61. rf.persisted_state().into(),
  62. rf.log.snapshot().1,
  63. );
  64. self.apply_command_signal.notify_one();
  65. InstallSnapshotReply { term: args.term }
  66. }
  67. pub(crate) fn build_install_snapshot(
  68. rf: &RaftState<C>,
  69. ) -> InstallSnapshotArgs {
  70. let (last, snapshot) = rf.log.snapshot();
  71. InstallSnapshotArgs {
  72. term: rf.current_term,
  73. leader_id: rf.leader_id,
  74. last_included_index: last.index,
  75. last_included_term: last.term,
  76. data: snapshot.to_owned(),
  77. offset: 0,
  78. done: true,
  79. }
  80. }
  81. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  82. pub(crate) async fn send_install_snapshot(
  83. rpc_client: &RpcClient,
  84. args: InstallSnapshotArgs,
  85. ) -> std::io::Result<Option<bool>> {
  86. let term = args.term;
  87. let reply = retry_rpc(
  88. Self::INSTALL_SNAPSHOT_RETRY,
  89. RPC_DEADLINE,
  90. move |_round| rpc_client.call_install_snapshot(args.clone()),
  91. )
  92. .await?;
  93. Ok(if reply.term == term { Some(true) } else { None })
  94. }
  95. }