heartbeats.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. use std::sync::atomic::Ordering;
  2. use std::time::Duration;
  3. use parking_lot::Mutex;
  4. use crate::beat_ticker::SharedBeatTicker;
  5. use crate::term_marker::TermMarker;
  6. use crate::utils::{retry_rpc, RPC_DEADLINE};
  7. use crate::{AppendEntriesArgs, Raft, RaftState, RemoteRaft};
  8. // Command must be
  9. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  10. // 1. clone: they are copied to the persister.
  11. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  12. // 3. serialize: they are converted to bytes to persist.
  13. impl<Command> Raft<Command>
  14. where
  15. Command: 'static + Clone + Send + serde::Serialize,
  16. {
  17. /// Schedules tasks that send heartbeats to peers.
  18. ///
  19. /// One task is scheduled for each peer. The task sleeps for a duration
  20. /// specified by `interval`, wakes up, builds the request message to send
  21. /// and delegates the actual RPC-sending to another task before going back
  22. /// to sleep.
  23. ///
  24. /// The sleeping task does nothing if we are not the leader.
  25. ///
  26. /// The request message is a stripped down version of `AppendEntries`. The
  27. /// response from the peer is ignored.
  28. pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
  29. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  30. if peer_index != self.me.0 {
  31. // rf is now owned by the outer async function.
  32. let rf = self.inner_state.clone();
  33. // A function that updates term with responses to heartbeats.
  34. let term_marker = self.term_marker();
  35. // A function that casts an "authoritative" vote with Ok()
  36. // responses to heartbeats.
  37. let beat_ticker = self.beat_ticker(peer_index);
  38. // RPC client must be cloned into the outer async function.
  39. let rpc_client = rpc_client.clone();
  40. // Shutdown signal.
  41. let keep_running = self.keep_running.clone();
  42. self.thread_pool.spawn(async move {
  43. let mut interval = tokio::time::interval(interval);
  44. while keep_running.load(Ordering::SeqCst) {
  45. interval.tick().await;
  46. if let Some(args) = Self::build_heartbeat(&rf) {
  47. tokio::spawn(Self::send_heartbeat(
  48. rpc_client.clone(),
  49. args,
  50. term_marker.clone(),
  51. beat_ticker.clone(),
  52. ));
  53. }
  54. }
  55. });
  56. }
  57. }
  58. }
  59. fn build_heartbeat(
  60. rf: &Mutex<RaftState<Command>>,
  61. ) -> Option<AppendEntriesArgs<Command>> {
  62. let rf = rf.lock();
  63. if !rf.is_leader() {
  64. return None;
  65. }
  66. let last_log = rf.log.last_index_term();
  67. let args = AppendEntriesArgs {
  68. term: rf.current_term,
  69. leader_id: rf.leader_id,
  70. prev_log_index: last_log.index,
  71. prev_log_term: last_log.term,
  72. entries: vec![],
  73. leader_commit: rf.commit_index,
  74. };
  75. Some(args)
  76. }
  77. const HEARTBEAT_RETRY: usize = 1;
  78. async fn send_heartbeat(
  79. // Here rpc_client must be owned by the returned future. The returned
  80. // future is scheduled to run on a thread pool. We do not control when
  81. // the future will be run, or when it will be done with the RPC client.
  82. // If a reference is passed in, the reference essentially has to be a
  83. // static one, i.e. lives forever. Thus we chose to let the future own
  84. // the RPC client.
  85. rpc_client: impl RemoteRaft<Command>,
  86. args: AppendEntriesArgs<Command>,
  87. term_watermark: TermMarker<Command>,
  88. beat_ticker: SharedBeatTicker,
  89. ) -> std::io::Result<()> {
  90. let term = args.term;
  91. let beat = beat_ticker.next_beat();
  92. // Passing a reference that is moved to the following closure.
  93. //
  94. // It won't work if the rpc_client of type Arc is moved into the closure
  95. // directly. To clone the Arc, the function must own a mutable reference
  96. // to it. At the same time, rpc_client.call_append_entries() returns a
  97. // future that must own a reference, too. That caused a compiling error
  98. // of FnMut allowing "references to captured variables to escape".
  99. //
  100. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  101. // (no Mut), which can allow references to escape.
  102. //
  103. // Another option is to use non-move closures, in which case rpc_client
  104. // of type Arc can be passed-in directly. However that requires args to
  105. // be sync because they can be shared by more than one futures.
  106. let rpc_client = &rpc_client;
  107. let response =
  108. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  109. rpc_client.append_entries(args.clone())
  110. })
  111. .await?;
  112. if term == response.term {
  113. beat_ticker.tick(beat);
  114. } else {
  115. term_watermark.mark(response.term);
  116. }
  117. Ok(())
  118. }
  119. }