|
|
@@ -15,6 +15,10 @@ struct SharedProgress {
|
|
|
indexes: Mutex<SharedIndexes>,
|
|
|
}
|
|
|
|
|
|
+/// Progress a peer during log sync.
|
|
|
+///
|
|
|
+/// The number of unclaimed sync requests is stored here. This struct also
|
|
|
+/// contains the last known of the length of the logs held by a peer.
|
|
|
#[derive(Clone)]
|
|
|
#[repr(align(64))]
|
|
|
pub(crate) struct PeerProgress {
|
|
|
@@ -36,20 +40,28 @@ impl PeerProgress {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn should_schedule(&self) -> bool {
|
|
|
- self.internal.opening.fetch_add(1, Ordering::AcqRel) == 0
|
|
|
- }
|
|
|
-
|
|
|
+ /// Claim all pending sync requests and clear the request counter.
|
|
|
pub fn take_task(&self) -> bool {
|
|
|
self.internal.opening.swap(0, Ordering::AcqRel) != 0
|
|
|
}
|
|
|
|
|
|
+ /// Increase the number of sync requests by one. A new round of log syncing
|
|
|
+ /// should be started if this is the first of a group of requests. No new
|
|
|
+ /// round is needed if the previous requests have not been claimed.
|
|
|
+ pub fn should_schedule(&self) -> bool {
|
|
|
+ self.internal.opening.fetch_add(1, Ordering::AcqRel) == 0
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Reset progress data stored for a peer when a new term is started.
|
|
|
pub fn reset_progress(&self, next_index: Index) {
|
|
|
let mut internal = self.internal.indexes.lock();
|
|
|
internal.next_index = next_index;
|
|
|
internal.current_step = 0;
|
|
|
}
|
|
|
|
|
|
+ /// Record failure of serving a sync request. The failure is usually caused
|
|
|
+ /// by peers disagreeing on the base commit. Here we chose to step back
|
|
|
+ /// exponentially to limit the rounds of syncing required.
|
|
|
pub fn record_failure(&self, committed_index: Index) {
|
|
|
let mut internal = self.internal.indexes.lock();
|
|
|
let step = &mut internal.current_step;
|
|
|
@@ -70,12 +82,15 @@ impl PeerProgress {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Record a success in serving a sync request. Move the pointer to the next
|
|
|
+ /// log item to sync.
|
|
|
pub fn record_success(&self, match_index: Index) {
|
|
|
let mut internal = self.internal.indexes.lock();
|
|
|
internal.next_index = match_index + 1;
|
|
|
internal.current_step = 0;
|
|
|
}
|
|
|
|
|
|
+ /// Returns the next log index to sync.
|
|
|
pub fn next_index(&self) -> Index {
|
|
|
self.internal.indexes.lock().next_index
|
|
|
}
|