apply_command.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. use std::sync::atomic::Ordering;
  2. use std::time::Duration;
  3. use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
  4. pub enum ApplyCommandMessage<Command> {
  5. Snapshot(Snapshot),
  6. Command(Index, Command),
  7. }
  8. pub trait ApplyCommandFnMut<Command>:
  9. 'static + Send + FnMut(ApplyCommandMessage<Command>)
  10. {
  11. }
  12. impl<Command, T: 'static + Send + FnMut(ApplyCommandMessage<Command>)>
  13. ApplyCommandFnMut<Command> for T
  14. {
  15. }
  16. impl<Command> Raft<Command>
  17. where
  18. Command: 'static + Clone + Send,
  19. {
  20. /// Runs a daemon thread that sends committed log entries to the
  21. /// application via a callback `apply_command`.
  22. ///
  23. /// If we still have the log entries to apply, they will be sent to the
  24. /// application in a loop. Otherwise if the log entries to apply is
  25. /// covered by the current log snapshot, the snapshot will be installed.
  26. ///
  27. /// This daemon guarantees to send log entries and snapshots in increasing
  28. /// order of the log index.
  29. ///
  30. /// No assumption is made about the callback `apply_command`, with a few
  31. /// exceptions.
  32. /// * This daemon does not assume the log entry has been 'accepted' or
  33. /// 'applied' by the application when the callback returns.
  34. ///
  35. /// * The callback can block, although blocking is not recommended. The
  36. /// callback should not block forever, otherwise Raft will fail to shutdown
  37. /// cleanly.
  38. ///
  39. /// * The `apply_command` callback cannot fail. It must keep retrying until
  40. /// the current log entry is 'accepted'. Otherwise the next log entry cannot
  41. /// be delivered to the application.
  42. ///
  43. /// After sending each log entry to the application, this daemon notifies
  44. /// the snapshot daemon that there may be a chance to create a new snapshot.
  45. pub(crate) fn run_apply_command_daemon(
  46. &self,
  47. mut apply_command: impl ApplyCommandFnMut<Command>,
  48. ) {
  49. let keep_running = self.keep_running.clone();
  50. let rf = self.inner_state.clone();
  51. let condvar = self.apply_command_signal.clone();
  52. let snapshot_daemon = self.snapshot_daemon.clone();
  53. let daemon_env = self.daemon_env.clone();
  54. let stop_wait_group = self.stop_wait_group.clone();
  55. let join_handle = std::thread::spawn(move || {
  56. // Note: do not change this to `let _ = ...`.
  57. let _guard = daemon_env.for_scope();
  58. while keep_running.load(Ordering::SeqCst) {
  59. let messages = {
  60. let mut rf = rf.lock();
  61. if rf.last_applied >= rf.commit_index {
  62. // We have applied all committed log entries, wait until
  63. // new log entries are committed.
  64. condvar.wait_for(
  65. &mut rf,
  66. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
  67. );
  68. }
  69. if rf.last_applied < rf.log.start() {
  70. let (index_term, data) = rf.log.snapshot();
  71. let messages =
  72. vec![ApplyCommandMessage::Snapshot(Snapshot {
  73. last_included_index: index_term.index,
  74. data: data.to_vec(),
  75. })];
  76. rf.last_applied = rf.log.start();
  77. messages
  78. } else if rf.last_applied < rf.commit_index {
  79. let index = rf.last_applied + 1;
  80. let last_one = rf.commit_index + 1;
  81. // This is safe because commit_index is always smaller
  82. // than log.end(), see COMMIT_INDEX_INVARIANT.
  83. assert!(last_one <= rf.log.end());
  84. let messages: Vec<ApplyCommandMessage<Command>> = rf
  85. .log
  86. .between(index, last_one)
  87. .iter()
  88. .map(|entry| {
  89. ApplyCommandMessage::Command(
  90. entry.index,
  91. entry.command.clone(),
  92. )
  93. })
  94. .collect();
  95. rf.last_applied = rf.commit_index;
  96. messages
  97. } else {
  98. continue;
  99. }
  100. };
  101. // Release the lock while calling external functions.
  102. for message in messages {
  103. apply_command(message);
  104. snapshot_daemon.trigger();
  105. }
  106. }
  107. drop(stop_wait_group);
  108. });
  109. self.daemon_env.watch_daemon(join_handle);
  110. }
  111. }