apply_command.rs 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. use std::sync::atomic::Ordering;
  2. use std::time::Duration;
  3. use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS};
  4. pub enum ApplyCommandMessage<Command> {
  5. Snapshot(Snapshot),
  6. Command(Index, Command),
  7. }
  8. pub trait ApplyCommandFnMut<Command>:
  9. 'static + Send + FnMut(ApplyCommandMessage<Command>)
  10. {
  11. }
  12. impl<Command, T: 'static + Send + FnMut(ApplyCommandMessage<Command>)>
  13. ApplyCommandFnMut<Command> for T
  14. {
  15. }
  16. impl<Command> Raft<Command>
  17. where
  18. Command: 'static + Clone + Send,
  19. {
  20. pub(crate) fn run_apply_command_daemon(
  21. &self,
  22. mut apply_command: impl ApplyCommandFnMut<Command>,
  23. ) {
  24. let keep_running = self.keep_running.clone();
  25. let rf = self.inner_state.clone();
  26. let condvar = self.apply_command_signal.clone();
  27. let snapshot_daemon = self.snapshot_daemon.clone();
  28. let stop_wait_group = self.stop_wait_group.clone();
  29. let join_handle = std::thread::spawn(move || {
  30. while keep_running.load(Ordering::SeqCst) {
  31. let messages = {
  32. let mut rf = rf.lock();
  33. if rf.last_applied >= rf.commit_index {
  34. condvar.wait_for(
  35. &mut rf,
  36. Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
  37. );
  38. }
  39. if rf.last_applied < rf.log.start() {
  40. let (index_term, data) = rf.log.snapshot();
  41. let messages =
  42. vec![ApplyCommandMessage::Snapshot(Snapshot {
  43. last_included_index: index_term.index,
  44. data: data.to_vec(),
  45. })];
  46. rf.last_applied = rf.log.start();
  47. messages
  48. } else if rf.last_applied < rf.commit_index {
  49. let index = rf.last_applied + 1;
  50. let last_one = rf.commit_index + 1;
  51. let messages: Vec<ApplyCommandMessage<Command>> = rf
  52. .log
  53. .between(index, last_one)
  54. .iter()
  55. .map(|entry| {
  56. ApplyCommandMessage::Command(
  57. entry.index,
  58. entry.command.clone(),
  59. )
  60. })
  61. .collect();
  62. rf.last_applied = rf.commit_index;
  63. messages
  64. } else {
  65. continue;
  66. }
  67. };
  68. // Release the lock while calling external functions.
  69. for message in messages {
  70. apply_command(message);
  71. snapshot_daemon.trigger();
  72. }
  73. }
  74. drop(stop_wait_group);
  75. });
  76. self.daemon_env.watch_daemon(join_handle);
  77. }
  78. }