heartbeats.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. use std::sync::atomic::{AtomicU64, Ordering};
  2. use std::sync::Arc;
  3. use std::time::{Duration, Instant};
  4. use parking_lot::Mutex;
  5. use crate::term_marker::TermMarker;
  6. use crate::utils::{retry_rpc, RPC_DEADLINE};
  7. use crate::verify_authority::DaemonBeatTicker;
  8. use crate::{
  9. AppendEntriesArgs, Raft, RaftState, RemoteRaft, ReplicableCommand,
  10. };
  11. pub(crate) const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(150);
  12. #[derive(Clone)]
  13. pub(crate) struct HeartbeatsDaemon {
  14. start: Instant,
  15. last_trigger: Arc<AtomicU64>,
  16. sender: tokio::sync::broadcast::Sender<()>,
  17. }
  18. impl HeartbeatsDaemon {
  19. const HEARTBEAT_MAX_DELAY_MILLIS: u64 = 30;
  20. pub fn create() -> Self {
  21. let (sender, _) = tokio::sync::broadcast::channel(1);
  22. Self {
  23. start: Instant::now(),
  24. last_trigger: Arc::new(AtomicU64::new(0)),
  25. sender,
  26. }
  27. }
  28. pub fn trigger(&self, force: bool) {
  29. let now = self.start.elapsed().as_millis();
  30. // u64 is big enough for more than 500 million years.
  31. let now_lower_bits = (now & (u64::MAX) as u128) as u64;
  32. let last_trigger = self.last_trigger.load(Ordering::Acquire);
  33. let next_trigger =
  34. last_trigger.wrapping_add(Self::HEARTBEAT_MAX_DELAY_MILLIS);
  35. // Do not trigger heartbeats too frequently, unless we are forced.
  36. if force || next_trigger < now_lower_bits {
  37. let previous_trigger = self
  38. .last_trigger
  39. .fetch_max(now_lower_bits, Ordering::AcqRel);
  40. if last_trigger == previous_trigger {
  41. let _ = self.sender.send(());
  42. }
  43. }
  44. }
  45. }
  46. // Command must be
  47. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  48. // 1. clone: they are copied to the persister.
  49. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  50. // 3. serialize: they are converted to bytes to persist.
  51. impl<Command: ReplicableCommand> Raft<Command> {
  52. /// Schedules tasks that send heartbeats to peers.
  53. ///
  54. /// One task is scheduled for each peer. The task sleeps for a duration
  55. /// specified by `interval`, wakes up, builds the request message to send
  56. /// and delegates the actual RPC-sending to another task before going back
  57. /// to sleep.
  58. ///
  59. /// The sleeping task does nothing if we are not the leader.
  60. ///
  61. /// The request message is a stripped down version of `AppendEntries`. The
  62. /// response from the peer is ignored.
  63. pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
  64. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  65. if peer_index != self.me.0 {
  66. // rf is now owned by the outer async function.
  67. let rf = self.inner_state.clone();
  68. // A function that updates term with responses to heartbeats.
  69. let term_marker = self.term_marker();
  70. // A function that casts an "authoritative" vote with Ok()
  71. // responses to heartbeats.
  72. let beat_ticker = self.beat_ticker(peer_index);
  73. // A on-demand trigger to sending a heartbeat.
  74. let mut trigger = self.heartbeats_daemon.sender.subscribe();
  75. // RPC client must be cloned into the outer async function.
  76. let rpc_client = rpc_client.clone();
  77. // Shutdown signal.
  78. let keep_running = self.keep_running.clone();
  79. self.thread_pool.spawn(async move {
  80. let mut interval = tokio::time::interval(interval);
  81. while keep_running.load(Ordering::Relaxed) {
  82. let tick = interval.tick();
  83. let trigger = trigger.recv();
  84. futures_util::pin_mut!(tick, trigger);
  85. let _ =
  86. futures_util::future::select(tick, trigger).await;
  87. if let Some(args) = Self::build_heartbeat(&rf) {
  88. tokio::spawn(Self::send_heartbeat(
  89. rpc_client.clone(),
  90. args,
  91. term_marker.clone(),
  92. beat_ticker.clone(),
  93. ));
  94. }
  95. }
  96. });
  97. }
  98. }
  99. }
  100. fn build_heartbeat(
  101. rf: &Mutex<RaftState<Command>>,
  102. ) -> Option<AppendEntriesArgs<Command>> {
  103. let rf = rf.lock();
  104. if !rf.is_leader() {
  105. return None;
  106. }
  107. let last_log = rf.log.last_index_term();
  108. let args = AppendEntriesArgs {
  109. term: rf.current_term,
  110. leader_id: rf.leader_id,
  111. prev_log_index: last_log.index,
  112. prev_log_term: last_log.term,
  113. entries: vec![],
  114. leader_commit: rf.commit_index,
  115. };
  116. Some(args)
  117. }
  118. const HEARTBEAT_RETRY: usize = 1;
  119. async fn send_heartbeat(
  120. // Here rpc_client must be owned by the returned future. The returned
  121. // future is scheduled to run on a thread pool. We do not control when
  122. // the future will be run, or when it will be done with the RPC client.
  123. // If a reference is passed in, the reference essentially has to be a
  124. // static one, i.e. lives forever. Thus we chose to let the future own
  125. // the RPC client.
  126. rpc_client: impl RemoteRaft<Command>,
  127. args: AppendEntriesArgs<Command>,
  128. term_watermark: TermMarker<Command>,
  129. beat_ticker: DaemonBeatTicker,
  130. ) -> std::io::Result<()> {
  131. let term = args.term;
  132. let beat = beat_ticker.next_beat();
  133. // Passing a reference that is moved to the following closure.
  134. //
  135. // It won't work if the rpc_client of type Arc is moved into the closure
  136. // directly. To clone the Arc, the function must own a mutable reference
  137. // to it. At the same time, rpc_client.call_append_entries() returns a
  138. // future that must own a reference, too. That caused a compiling error
  139. // of FnMut allowing "references to captured variables to escape".
  140. //
  141. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  142. // (no Mut), which can allow references to escape.
  143. //
  144. // Another option is to use non-move closures, in which case rpc_client
  145. // of type Arc can be passed-in directly. However that requires args to
  146. // be sync because they can be shared by more than one futures.
  147. let rpc_client = &rpc_client;
  148. let response =
  149. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  150. rpc_client.append_entries(args.clone())
  151. })
  152. .await?;
  153. if term == response.term {
  154. beat_ticker.tick(beat);
  155. } else {
  156. term_watermark.mark(response.term);
  157. }
  158. Ok(())
  159. }
  160. }