| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- use std::ops::Deref;
- use std::sync::atomic::{AtomicU64, Ordering};
- use std::sync::Arc;
- /// A beat is one request sent to a peer with a success response.
- /// The `usize` within is the unique ID of the request.
- #[cfg(not(test))]
- #[derive(Debug, Eq, Ord, PartialOrd, PartialEq)]
- pub(crate) struct Beat(u64);
- #[cfg(test)]
- #[derive(Debug, Eq, Ord, PartialOrd, PartialEq)]
- pub(crate) struct Beat(pub(crate) u64);
- /// A `BeatTicker` issues unique request IDs and records successful runs.
- ///
- /// Each peer should have its own `BeatTicker`. Requests are ordered by the wall
- /// time they call `BeatTicker::next_beat()`. Each successful request marks the
- /// recognition from the peer that the sender (this instance) is the leader.
- ///
- /// The leader status is continuous for a certain term. Imagine the following
- /// scenario. We are elected leader at absolute time `X`, and send a message
- /// to a peer at time `Y` (`Y > X`). The peer confirms the leader status by
- /// replying to the message. We can then assume that the peer recognizes us as
- /// the leader in entire time interval `[X, Y]`.
- ///
- /// For each term, the starting point of the interval (`X`) is fixed. Newer
- /// requests extends the interval further. Thus, we only need to record the last
- /// confirmation from a peer.
- ///
- /// At any time `T` that has `T > X`, if there are more than `N/2` peers
- /// confirmed the leader status after `T`, we can be sure that at time `T` we
- /// were the leader.
- pub(crate) struct BeatTicker {
- beat_count: AtomicU64,
- ticked: AtomicU64,
- }
- impl BeatTicker {
- /// Creates a `BeatTicker`.
- /// The first unique request ID issued by the ticker will be 1. The initial
- /// value of successful request will start at ID 0.
- fn create() -> Self {
- Self {
- beat_count: AtomicU64::new(1),
- ticked: AtomicU64::new(0),
- }
- }
- /// Issues the next unique request ID.
- pub fn next_beat(&self) -> Beat {
- let count = self.beat_count.fetch_add(1, Ordering::AcqRel);
- assert_ne!(count, u64::MAX, "BeatTicker count overflow");
- Beat(count)
- }
- /// Returns the newest beat (request ID).
- pub fn current_beat(&self) -> Beat {
- Beat(self.beat_count.load(Ordering::Acquire))
- }
- /// Marks a beat (request) as successful.
- pub fn tick(&self, beat: Beat) {
- self.ticked.fetch_max(beat.0, Ordering::AcqRel);
- }
- /// Returns the last successful beat (request ID).
- pub fn ticked(&self) -> Beat {
- Beat(self.ticked.load(Ordering::Acquire))
- }
- }
- /// A smart pointer to share `BeatTicker` among threads and tasks.
- #[derive(Clone)]
- pub(crate) struct SharedBeatTicker(Arc<BeatTicker>);
- impl SharedBeatTicker {
- pub fn create() -> Self {
- Self(Arc::new(BeatTicker::create()))
- }
- }
- impl Deref for SharedBeatTicker {
- type Target = BeatTicker;
- fn deref(&self) -> &Self::Target {
- self.0.deref()
- }
- }
|