heartbeats.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. use std::sync::atomic::{AtomicU64, Ordering};
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use parking_lot::Mutex;
  5. use crate::remote_context::RemoteContext;
  6. use crate::utils::{retry_rpc, RPC_DEADLINE};
  7. use crate::{AppendEntriesArgs, Peer, Raft, RaftState, ReplicableCommand};
  8. pub(crate) const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(150);
  9. #[derive(Clone)]
  10. pub(crate) struct HeartbeatsDaemon {
  11. start: Instant,
  12. last_trigger: Arc<AtomicU64>,
  13. sender: tokio::sync::broadcast::Sender<()>,
  14. }
  15. impl HeartbeatsDaemon {
  16. const HEARTBEAT_MAX_DELAY_MILLIS: u64 = 30;
  17. pub fn create() -> Self {
  18. let (sender, _) = tokio::sync::broadcast::channel(1);
  19. Self {
  20. start: Instant::now(),
  21. last_trigger: Arc::new(AtomicU64::new(0)),
  22. sender,
  23. }
  24. }
  25. pub fn trigger(&self, force: bool) {
  26. let now = self.start.elapsed().as_millis();
  27. // u64 is big enough for more than 500 million years.
  28. let now_lower_bits = (now & (u64::MAX) as u128) as u64;
  29. let last_trigger = self.last_trigger.load(Ordering::Acquire);
  30. let next_trigger =
  31. last_trigger.wrapping_add(Self::HEARTBEAT_MAX_DELAY_MILLIS);
  32. // Do not trigger heartbeats too frequently, unless we are forced.
  33. if force || next_trigger < now_lower_bits {
  34. let previous_trigger = self
  35. .last_trigger
  36. .fetch_max(now_lower_bits, Ordering::AcqRel);
  37. if last_trigger == previous_trigger {
  38. let _ = self.sender.send(());
  39. }
  40. }
  41. }
  42. }
  43. // Command must be
  44. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  45. // 1. clone: they are copied to the persister.
  46. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  47. // 3. serialize: they are converted to bytes to persist.
  48. impl<Command: ReplicableCommand> Raft<Command> {
  49. /// Schedules tasks that send heartbeats to peers.
  50. ///
  51. /// One task is scheduled for each peer. The task sleeps for a duration
  52. /// specified by `interval`, wakes up, builds the request message to send
  53. /// and delegates the actual RPC-sending to another task before going back
  54. /// to sleep.
  55. ///
  56. /// The sleeping task does nothing if we are not the leader.
  57. ///
  58. /// The request message is a stripped down version of `AppendEntries`. The
  59. /// response from the peer is ignored.
  60. pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
  61. // rf is now owned by the outer async function.
  62. let rf = self.inner_state.clone();
  63. // A on-demand trigger to sending a heartbeat.
  64. let mut trigger = self.heartbeats_daemon.sender.subscribe();
  65. // Shutdown signal.
  66. let keep_running = self.keep_running.clone();
  67. let peers = self.peers.clone();
  68. self.thread_pool.spawn(async move {
  69. let mut interval = tokio::time::interval(interval);
  70. while keep_running.load(Ordering::Relaxed) {
  71. let tick = interval.tick();
  72. let trigger = trigger.recv();
  73. futures_util::pin_mut!(tick, trigger);
  74. let _ = futures_util::future::select(tick, trigger).await;
  75. if let Some(args) = Self::build_heartbeat(&rf) {
  76. for peer in &peers {
  77. tokio::spawn(Self::send_heartbeat(*peer, args.clone()));
  78. }
  79. }
  80. }
  81. });
  82. }
  83. fn build_heartbeat(
  84. rf: &Mutex<RaftState<Command>>,
  85. ) -> Option<AppendEntriesArgs<Command>> {
  86. let rf = rf.lock();
  87. if !rf.is_leader() {
  88. return None;
  89. }
  90. let last_log = rf.log.last_index_term();
  91. let args = AppendEntriesArgs {
  92. term: rf.current_term,
  93. leader_id: rf.leader_id,
  94. prev_log_index: last_log.index,
  95. prev_log_term: last_log.term,
  96. entries: vec![],
  97. leader_commit: rf.commit_index,
  98. };
  99. Some(args)
  100. }
  101. const HEARTBEAT_RETRY: usize = 1;
  102. async fn send_heartbeat(
  103. peer: Peer,
  104. args: AppendEntriesArgs<Command>,
  105. ) -> std::io::Result<()> {
  106. let term = args.term;
  107. let beat_ticker = RemoteContext::<Command>::beat_ticker(peer);
  108. let beat = beat_ticker.next_beat();
  109. // Passing a reference that is moved to the following closure.
  110. //
  111. // It won't work if the rpc_client of type Arc is moved into the closure
  112. // directly. To clone the Arc, the function must own a mutable reference
  113. // to it. At the same time, rpc_client.call_append_entries() returns a
  114. // future that must own a reference, too. That caused a compiling error
  115. // of FnMut allowing "references to captured variables to escape".
  116. //
  117. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  118. // (no Mut), which can allow references to escape.
  119. //
  120. // Another option is to use non-move closures, in which case rpc_client
  121. // of type Arc can be passed-in directly. However that requires args to
  122. // be sync because they can be shared by more than one futures.
  123. let rpc_client = RemoteContext::<Command>::rpc_client(peer);
  124. let response =
  125. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  126. rpc_client.append_entries(args.clone())
  127. })
  128. .await?;
  129. if term == response.term {
  130. beat_ticker.tick(beat);
  131. } else {
  132. RemoteContext::<Command>::term_marker().mark(response.term);
  133. }
  134. Ok(())
  135. }
  136. }