apply_command.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use std::sync::atomic::Ordering;
  2. use std::time::Duration;
  3. use crate::daemon_env::Daemon;
  4. use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
  5. pub enum ApplyCommandMessage<Command> {
  6. Snapshot(Snapshot),
  7. Command(Index, Command),
  8. }
  9. pub trait ApplyCommandFnMut<Command>:
  10. 'static + Send + FnMut(ApplyCommandMessage<Command>)
  11. {
  12. }
  13. impl<Command, T: 'static + Send + FnMut(ApplyCommandMessage<Command>)>
  14. ApplyCommandFnMut<Command> for T
  15. {
  16. }
  17. impl<Command> Raft<Command>
  18. where
  19. Command: 'static + Clone + Send,
  20. {
  21. /// Runs a daemon thread that sends committed log entries to the
  22. /// application via a callback `apply_command`.
  23. ///
  24. /// If we still have the log entries to apply, they will be sent to the
  25. /// application in a loop. Otherwise if the log entries to apply is
  26. /// covered by the current log snapshot, the snapshot will be installed.
  27. ///
  28. /// This daemon guarantees to send log entries and snapshots in increasing
  29. /// order of the log index.
  30. ///
  31. /// No assumption is made about the callback `apply_command`, with a few
  32. /// exceptions.
  33. /// * This daemon does not assume the log entry has been 'accepted' or
  34. /// 'applied' by the application when the callback returns.
  35. ///
  36. /// * The callback can block, although blocking is not recommended. The
  37. /// callback should not block forever, otherwise Raft will fail to shutdown
  38. /// cleanly.
  39. ///
  40. /// * The `apply_command` callback cannot fail. It must keep retrying until
  41. /// the current log entry is 'accepted'. Otherwise the next log entry cannot
  42. /// be delivered to the application.
  43. ///
  44. /// After sending each log entry to the application, this daemon notifies
  45. /// the snapshot daemon that there may be a chance to create a new snapshot.
  46. pub(crate) fn run_apply_command_daemon(
  47. &self,
  48. mut apply_command: impl ApplyCommandFnMut<Command>,
  49. ) {
  50. let keep_running = self.keep_running.clone();
  51. let me = self.me;
  52. let rf = self.inner_state.clone();
  53. let condvar = self.apply_command_signal.clone();
  54. let snapshot_daemon = self.snapshot_daemon.clone();
  55. let daemon_env = self.daemon_env.clone();
  56. let stop_wait_group = self.stop_wait_group.clone();
  57. let join_handle = std::thread::spawn(move || {
  58. // Note: do not change this to `let _ = ...`.
  59. let _guard = daemon_env.for_scope();
  60. log::info!("{:?} apply command daemon running ...", me);
  61. while keep_running.load(Ordering::SeqCst) {
  62. let messages = {
  63. let mut rf = rf.lock();
  64. if rf.last_applied >= rf.commit_index {
  65. // We have applied all committed log entries, wait until
  66. // new log entries are committed.
  67. condvar.wait_for(
  68. &mut rf,
  69. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
  70. );
  71. }
  72. // Note that between those two nested `if`s, log start is
  73. // always smaller than or equal to commit index, as
  74. // guaranteed by the SNAPSHOT_INDEX_INVARIANT.
  75. assert!(rf.log.start() <= rf.commit_index);
  76. if rf.last_applied < rf.log.start() {
  77. let (index_term, data) = rf.log.snapshot();
  78. let messages =
  79. vec![ApplyCommandMessage::Snapshot(Snapshot {
  80. last_included_index: index_term.index,
  81. data: data.to_vec(),
  82. })];
  83. rf.last_applied = rf.log.start();
  84. messages
  85. } else if rf.last_applied < rf.commit_index {
  86. let index = rf.last_applied + 1;
  87. let last_one = rf.commit_index + 1;
  88. // This is safe because commit_index is always smaller
  89. // than log.end(), see COMMIT_INDEX_INVARIANT.
  90. assert!(last_one <= rf.log.end());
  91. let messages: Vec<ApplyCommandMessage<Command>> = rf
  92. .log
  93. .between(index, last_one)
  94. .iter()
  95. .map(|entry| {
  96. ApplyCommandMessage::Command(
  97. entry.index,
  98. entry.command.clone(),
  99. )
  100. })
  101. .collect();
  102. rf.last_applied = rf.commit_index;
  103. messages
  104. } else {
  105. continue;
  106. }
  107. };
  108. // Release the lock while calling external functions.
  109. for message in messages {
  110. apply_command(message);
  111. snapshot_daemon.trigger();
  112. }
  113. }
  114. log::info!("{:?} apply command daemon done.", me);
  115. drop(stop_wait_group);
  116. });
  117. self.daemon_env
  118. .watch_daemon(Daemon::ApplyCommand, join_handle);
  119. }
  120. }