beat_ticker.rs 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. use std::ops::Deref;
  2. use std::sync::atomic::{AtomicU64, Ordering};
  3. use std::sync::Arc;
  4. /// A beat is one request sent to a peer with a success response.
  5. /// The `usize` within is the unique ID of the request.
  6. #[cfg(not(test))]
  7. #[derive(Debug, Eq, Ord, PartialOrd, PartialEq)]
  8. pub(crate) struct Beat(u64);
  9. #[cfg(test)]
  10. #[derive(Debug, Eq, Ord, PartialOrd, PartialEq)]
  11. pub(crate) struct Beat(pub(crate) u64);
  12. /// A `BeatTicker` issues unique request IDs and records successful runs.
  13. ///
  14. /// Each peer should have its own `BeatTicker`. Requests are ordered by the wall
  15. /// time they call `BeatTicker::next_beat()`. Each successful request marks the
  16. /// recognition from the peer that the sender (this instance) is the leader.
  17. ///
  18. /// The leader status is continuous for a certain term. Imagine the following
  19. /// scenario. We are elected leader at absolute time `X`, and send a message
  20. /// to a peer at time `Y` (`Y > X`). The peer confirms the leader status by
  21. /// replying to the message. We can then assume that the peer recognizes us as
  22. /// the leader in entire time interval `[X, Y]`.
  23. ///
  24. /// For each term, the starting point of the interval (`X`) is fixed. Newer
  25. /// requests extends the interval further. Thus, we only need to record the last
  26. /// confirmation from a peer.
  27. ///
  28. /// At any time `T` that has `T > X`, if there are more than `N/2` peers
  29. /// confirmed the leader status after `T`, we can be sure that at time `T` we
  30. /// were the leader.
  31. pub(crate) struct BeatTicker {
  32. beat_count: AtomicU64,
  33. ticked: AtomicU64,
  34. }
  35. impl BeatTicker {
  36. /// Creates a `BeatTicker`.
  37. /// The first unique request ID issued by the ticker will be 1. The initial
  38. /// value of successful request will start at ID 0.
  39. fn create() -> Self {
  40. Self {
  41. beat_count: AtomicU64::new(1),
  42. ticked: AtomicU64::new(0),
  43. }
  44. }
  45. /// Issues the next unique request ID.
  46. pub fn next_beat(&self) -> Beat {
  47. let count = self.beat_count.fetch_add(1, Ordering::AcqRel);
  48. assert_ne!(count, u64::MAX, "BeatTicker count overflow");
  49. Beat(count)
  50. }
  51. /// Returns the newest beat (request ID).
  52. pub fn current_beat(&self) -> Beat {
  53. Beat(self.beat_count.load(Ordering::Acquire))
  54. }
  55. /// Marks a beat (request) as successful.
  56. pub fn tick(&self, beat: Beat) {
  57. self.ticked.fetch_max(beat.0, Ordering::AcqRel);
  58. }
  59. /// Returns the last successful beat (request ID).
  60. pub fn ticked(&self) -> Beat {
  61. Beat(self.ticked.load(Ordering::Acquire))
  62. }
  63. }
  64. /// A smart pointer to share `BeatTicker` among threads and tasks.
  65. #[derive(Clone)]
  66. pub(crate) struct SharedBeatTicker(Arc<BeatTicker>);
  67. impl SharedBeatTicker {
  68. pub fn create() -> Self {
  69. Self(Arc::new(BeatTicker::create()))
  70. }
  71. }
  72. impl Deref for SharedBeatTicker {
  73. type Target = BeatTicker;
  74. fn deref(&self) -> &Self::Target {
  75. self.0.deref()
  76. }
  77. }