process_append_entries.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use crate::daemon_env::ErrorKind;
  2. use crate::{
  3. check_or_record, AppendEntriesArgs, AppendEntriesReply, IndexTerm, Raft,
  4. State,
  5. };
  6. // Command must be
  7. // 1. clone: they are copied to the persister.
  8. // 2. serialize: they are converted to bytes to persist.
  9. impl<Command: Clone + serde::Serialize> Raft<Command> {
  10. pub fn process_append_entries(
  11. &self,
  12. args: AppendEntriesArgs<Command>,
  13. ) -> AppendEntriesReply {
  14. // Note: do not change this to `let _ = ...`.
  15. let _guard = self.daemon_env.for_scope();
  16. let mut rf = self.inner_state.lock();
  17. if rf.current_term > args.term {
  18. return AppendEntriesReply {
  19. term: rf.current_term,
  20. success: false,
  21. committed: Some(rf.log.first_after(rf.commit_index).into()),
  22. };
  23. }
  24. if rf.current_term < args.term {
  25. rf.current_term = args.term;
  26. rf.voted_for = None;
  27. self.persister.save_state(rf.persisted_state().into());
  28. }
  29. rf.state = State::Follower;
  30. rf.leader_id = args.leader_id;
  31. self.election.reset_election_timer();
  32. if rf.log.start() > args.prev_log_index
  33. || rf.log.end() <= args.prev_log_index
  34. || rf.log.at(args.prev_log_index).term != args.prev_log_term
  35. {
  36. return AppendEntriesReply {
  37. term: args.term,
  38. success: args.prev_log_index < rf.log.start(),
  39. committed: Some(rf.log.first_after(rf.commit_index).into()),
  40. };
  41. }
  42. let index_matches = args
  43. .entries
  44. .iter()
  45. .enumerate()
  46. .all(|(i, entry)| entry.index == i + args.prev_log_index + 1);
  47. if !index_matches {
  48. let indexes: Vec<IndexTerm> =
  49. args.entries.iter().map(|e| e.into()).collect();
  50. check_or_record!(
  51. index_matches,
  52. ErrorKind::AppendEntriesIndexMismatch(
  53. args.prev_log_index,
  54. indexes
  55. ),
  56. "Entries in AppendEntries request shows index mismatch",
  57. &rf
  58. );
  59. return AppendEntriesReply {
  60. term: args.term,
  61. success: false,
  62. committed: Some(rf.log.first_after(rf.commit_index).into()),
  63. };
  64. }
  65. // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
  66. // commit_index < log.end().
  67. for (i, entry) in args.entries.iter().enumerate() {
  68. let index = i + args.prev_log_index + 1;
  69. if rf.log.end() > index {
  70. if rf.log.at(index).term != entry.term {
  71. check_or_record!(
  72. index > rf.commit_index,
  73. ErrorKind::RollbackCommitted(index),
  74. "Entries before commit index should never be rolled back",
  75. &rf
  76. );
  77. // COMMIT_INDEX_INVARIANT: log.end() shrinks to index. We
  78. // checked that index is strictly larger than commit_index.
  79. // The condition that commit_index < log.end() holds.
  80. rf.log.truncate(index);
  81. rf.log.push(entry.clone());
  82. }
  83. // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
  84. // The condition that commit_index < log.end() holds.
  85. } else {
  86. // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
  87. // that commit_index < log.end() holds.
  88. rf.log.push(entry.clone());
  89. }
  90. }
  91. // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
  92. // must still hold.
  93. self.persister.save_state(rf.persisted_state().into());
  94. // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
  95. // after this if-statement. log.start() <= commit_index still holds.
  96. if args.leader_commit > rf.commit_index {
  97. // COMMIT_INDEX_INVARIANT: commit_index < log.end() still holds
  98. // after this assignment.
  99. rf.commit_index = if args.leader_commit < rf.log.end() {
  100. // COMMIT_INDEX_INVARIANT: The if-condition guarantees that
  101. // leader_commit < log.end().
  102. args.leader_commit
  103. } else {
  104. // COMMIT_INDEX_INVARIANT: This is exactly log.end() - 1.
  105. rf.log.last_index_term().index
  106. };
  107. self.apply_command_signal.notify_one();
  108. }
  109. self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
  110. AppendEntriesReply {
  111. term: args.term,
  112. success: true,
  113. committed: None,
  114. }
  115. }
  116. }