heartbeats.rs 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use std::time::Duration;
  4. use parking_lot::Mutex;
  5. use crate::utils::{retry_rpc, RPC_DEADLINE};
  6. use crate::{AppendEntriesArgs, Raft, RaftState, RpcClient};
  7. // Command must be
  8. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  9. // 1. clone: they are copied to the persister.
  10. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  11. // 3. serialize: they are converted to bytes to persist.
  12. impl<Command> Raft<Command>
  13. where
  14. Command: 'static + Clone + Send + serde::Serialize,
  15. {
  16. pub(crate) fn schedule_heartbeats(&self, interval: Duration) {
  17. for (peer_index, rpc_client) in self.peers.iter().enumerate() {
  18. if peer_index != self.me.0 {
  19. // rf is now owned by the outer async function.
  20. let rf = self.inner_state.clone();
  21. // RPC client must be cloned into the outer async function.
  22. let rpc_client = rpc_client.clone();
  23. // Shutdown signal.
  24. let keep_running = self.keep_running.clone();
  25. self.thread_pool.spawn(async move {
  26. let mut interval = tokio::time::interval(interval);
  27. while keep_running.load(Ordering::SeqCst) {
  28. interval.tick().await;
  29. if let Some(args) = Self::build_heartbeat(&rf) {
  30. tokio::spawn(Self::send_heartbeat(
  31. rpc_client.clone(),
  32. args,
  33. ));
  34. }
  35. }
  36. });
  37. }
  38. }
  39. }
  40. fn build_heartbeat(
  41. rf: &Mutex<RaftState<Command>>,
  42. ) -> Option<AppendEntriesArgs<Command>> {
  43. let rf = rf.lock();
  44. if !rf.is_leader() {
  45. return None;
  46. }
  47. let last_log = rf.log.last_index_term();
  48. let args = AppendEntriesArgs {
  49. term: rf.current_term,
  50. leader_id: rf.leader_id,
  51. prev_log_index: last_log.index,
  52. prev_log_term: last_log.term,
  53. entries: vec![],
  54. leader_commit: rf.commit_index,
  55. };
  56. Some(args)
  57. }
  58. const HEARTBEAT_RETRY: usize = 1;
  59. async fn send_heartbeat(
  60. rpc_client: Arc<RpcClient>,
  61. args: AppendEntriesArgs<Command>,
  62. ) -> std::io::Result<()> {
  63. // Passing a reference that is moved to the following closure.
  64. //
  65. // It won't work if the rpc_client of type Arc is moved into the closure
  66. // directly. To clone the Arc, the function must own a mutable reference
  67. // to it. At the same time, rpc_client.call_append_entries() returns a
  68. // future that must own a reference, too. That caused a compiling error
  69. // of FnMut allowing "references to captured variables to escape".
  70. //
  71. // By passing-in a reference instead of an Arc, the closure becomes a Fn
  72. // (no Mut), which can allow references to escape.
  73. //
  74. // Another option is to use non-move closures, in which case rpc_client
  75. // of type Arc can be passed-in directly. However that requires args to
  76. // be sync because they can be shared by more than one futures.
  77. let rpc_client = rpc_client.as_ref();
  78. retry_rpc(Self::HEARTBEAT_RETRY, RPC_DEADLINE, move |_round| {
  79. rpc_client.call_append_entries(args.clone())
  80. })
  81. .await?;
  82. Ok(())
  83. }
  84. }