process_append_entries.rs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. use crate::daemon_env::ErrorKind;
  2. use crate::{
  3. check_or_record, AppendEntriesArgs, AppendEntriesReply, IndexTerm, Raft,
  4. };
  5. // Command must be
  6. // 1. clone: they are copied to the persister.
  7. // 2. serialize: they are converted to bytes to persist.
  8. impl<Command: Clone + serde::Serialize> Raft<Command> {
  9. pub fn process_append_entries(
  10. &self,
  11. args: AppendEntriesArgs<Command>,
  12. ) -> AppendEntriesReply {
  13. // Note: do not change this to `let _ = ...`.
  14. let _guard = self.daemon_env.for_scope();
  15. let mut rf = self.inner_state.lock();
  16. if rf.current_term > args.term {
  17. return AppendEntriesReply {
  18. term: rf.current_term,
  19. success: false,
  20. committed: Some(rf.log.first_after(rf.commit_index).into()),
  21. };
  22. }
  23. if rf.current_term < args.term {
  24. rf.current_term = args.term;
  25. rf.voted_for = None;
  26. self.persister.save_state(rf.persisted_state().into());
  27. }
  28. rf.meet_leader(args.leader_id);
  29. self.election.reset_election_timer();
  30. if rf.log.start() > args.prev_log_index
  31. || rf.log.end() <= args.prev_log_index
  32. || rf.log.at(args.prev_log_index).term != args.prev_log_term
  33. {
  34. return AppendEntriesReply {
  35. term: args.term,
  36. success: args.prev_log_index < rf.log.start(),
  37. committed: Some(rf.log.first_after(rf.commit_index).into()),
  38. };
  39. }
  40. let index_matches = args
  41. .entries
  42. .iter()
  43. .enumerate()
  44. .all(|(i, entry)| entry.index == i + args.prev_log_index + 1);
  45. if !index_matches {
  46. let indexes: Vec<IndexTerm> =
  47. args.entries.iter().map(|e| e.into()).collect();
  48. check_or_record!(
  49. index_matches,
  50. ErrorKind::AppendEntriesIndexMismatch(
  51. args.prev_log_index,
  52. indexes
  53. ),
  54. "Entries in AppendEntries request shows index mismatch",
  55. &rf
  56. );
  57. return AppendEntriesReply {
  58. term: args.term,
  59. success: false,
  60. committed: Some(rf.log.first_after(rf.commit_index).into()),
  61. };
  62. }
  63. // COMMIT_INDEX_INVARIANT: Before this loop, we can safely assume that
  64. // commit_index < log.end().
  65. for (i, entry) in args.entries.iter().enumerate() {
  66. let index = i + args.prev_log_index + 1;
  67. if rf.log.end() > index {
  68. if rf.log.at(index).term != entry.term {
  69. check_or_record!(
  70. index > rf.commit_index,
  71. ErrorKind::RollbackCommitted(index),
  72. "Entries before commit index should never be rolled back",
  73. &rf
  74. );
  75. // COMMIT_INDEX_INVARIANT: log.end() shrinks to index. We
  76. // checked that index is strictly larger than commit_index.
  77. // The condition that commit_index < log.end() holds.
  78. rf.log.truncate(index);
  79. rf.log.push(entry.clone());
  80. }
  81. // COMMIT_INDEX_INVARIANT: Otherwise log.end() does not move.
  82. // The condition that commit_index < log.end() holds.
  83. } else {
  84. // COMMIT_INDEX_INVARIANT: log.end() grows larger. The condition
  85. // that commit_index < log.end() holds.
  86. rf.log.push(entry.clone());
  87. }
  88. }
  89. // COMMIT_INDEX_INVARIANT: After the loop, commit_index < log.end()
  90. // must still hold.
  91. self.persister.save_state(rf.persisted_state().into());
  92. // SNAPSHOT_INDEX_INVARIANT: commit_index increases (or stays unchanged)
  93. // after this if-statement. log.start() <= commit_index still holds.
  94. if args.leader_commit > rf.commit_index {
  95. // COMMIT_INDEX_INVARIANT: commit_index < log.end() still holds
  96. // after this assignment.
  97. rf.commit_index = if args.leader_commit < rf.log.end() {
  98. // COMMIT_INDEX_INVARIANT: The if-condition guarantees that
  99. // leader_commit < log.end().
  100. args.leader_commit
  101. } else {
  102. // COMMIT_INDEX_INVARIANT: This is exactly log.end() - 1.
  103. rf.log.last_index_term().index
  104. };
  105. self.apply_command_signal.notify_one();
  106. }
  107. self.snapshot_daemon.log_grow(rf.log.start(), rf.log.end());
  108. AppendEntriesReply {
  109. term: args.term,
  110. success: true,
  111. committed: None,
  112. }
  113. }
  114. }