process_append_entries.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. use crate::daemon_env::ErrorKind;
  2. use crate::{
  3. check_or_record, AppendEntriesArgs, AppendEntriesReply, Raft, State,
  4. };
  5. // Command must be
  6. // 1. clone: they are copied to the persister.
  7. // 2. serialize: they are converted to bytes to persist.
  8. // 3. default: a default value is used as the first element of the log.
  9. impl<Command> Raft<Command>
  10. where
  11. Command: Clone + serde::Serialize + Default,
  12. {
  13. pub(crate) fn process_append_entries(
  14. &self,
  15. args: AppendEntriesArgs<Command>,
  16. ) -> AppendEntriesReply {
  17. // Note: do not change this to `let _ = ...`.
  18. let _guard = self.daemon_env.for_scope();
  19. let mut rf = self.inner_state.lock();
  20. if rf.current_term > args.term {
  21. return AppendEntriesReply {
  22. term: rf.current_term,
  23. success: false,
  24. committed: Some(rf.log.first_after(rf.commit_index).into()),
  25. };
  26. }
  27. if rf.current_term < args.term {
  28. rf.current_term = args.term;
  29. rf.voted_for = None;
  30. self.persister.save_state(rf.persisted_state().into());
  31. }
  32. rf.state = State::Follower;
  33. rf.leader_id = args.leader_id;
  34. self.election.reset_election_timer();
  35. if rf.log.start() > args.prev_log_index
  36. || rf.log.end() <= args.prev_log_index
  37. || rf.log[args.prev_log_index].term != args.prev_log_term
  38. {
  39. return AppendEntriesReply {
  40. term: args.term,
  41. success: args.prev_log_index < rf.log.start(),
  42. committed: Some(rf.log.first_after(rf.commit_index).into()),
  43. };
  44. }
  45. // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
  46. // commit_index < log.end().
  47. for (i, entry) in args.entries.iter().enumerate() {
  48. let index = i + args.prev_log_index + 1;
  49. if rf.log.end() > index {
  50. if rf.log[index].term != entry.term {
  51. check_or_record!(
  52. index > rf.commit_index,
  53. ErrorKind::RollbackCommitted(index),
  54. "Entries before commit index should never be rolled back",
  55. &rf
  56. );
  57. // COMMIT_INDEX_INVARIANT: log.end() shrinks to index. We
  58. // checked that index is strictly larger than commit_index.
  59. // The condition that commit_index < log.end() holds.
  60. rf.log.truncate(index);
  61. rf.log.push(entry.clone());
  62. }
  63. // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
  64. // The condition that commit_index < log.end() holds.
  65. } else {
  66. // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
  67. // that commit_index < log.end() holds.
  68. rf.log.push(entry.clone());
  69. }
  70. }
  71. // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
  72. // must still hold.
  73. self.persister.save_state(rf.persisted_state().into());
  74. // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
  75. // after this if-statement. log.start() <= commit_index still holds.
  76. if args.leader_commit > rf.commit_index {
  77. // COMMIT_INDEX_INVARIANT: commit_index < log.end() still holds
  78. // after this assignment.
  79. rf.commit_index = if args.leader_commit < rf.log.end() {
  80. // COMMIT_INDEX_INVARIANT: The if-condition guarantees that
  81. // leader_commit < log.end().
  82. args.leader_commit
  83. } else {
  84. // COMMIT_INDEX_INVARIANT: This is exactly log.end() - 1.
  85. rf.log.last_index_term().index
  86. };
  87. self.apply_command_signal.notify_one();
  88. }
  89. self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
  90. AppendEntriesReply {
  91. term: args.term,
  92. success: true,
  93. committed: None,
  94. }
  95. }
  96. }