peer_progress.rs 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use parking_lot::Mutex;
  4. use crate::{Index, Peer};
  5. struct SharedIndexes {
  6. next_index: Index,
  7. current_step: i64,
  8. }
  9. struct SharedProgress {
  10. opening: AtomicUsize,
  11. indexes: Mutex<SharedIndexes>,
  12. }
  13. #[derive(Clone)]
  14. #[repr(align(64))]
  15. pub(crate) struct PeerProgress {
  16. pub peer: Peer,
  17. internal: Arc<SharedProgress>,
  18. }
  19. impl PeerProgress {
  20. pub fn create(peer_index: usize) -> Self {
  21. Self {
  22. peer: Peer(peer_index),
  23. internal: Arc::new(SharedProgress {
  24. opening: AtomicUsize::new(0),
  25. indexes: Mutex::new(SharedIndexes {
  26. next_index: 1,
  27. current_step: 0,
  28. }),
  29. }),
  30. }
  31. }
  32. pub fn should_schedule(&self) -> bool {
  33. self.internal.opening.fetch_add(1, Ordering::AcqRel) == 0
  34. }
  35. pub fn take_task(&self) -> bool {
  36. self.internal.opening.swap(0, Ordering::AcqRel) != 0
  37. }
  38. pub fn reset_progress(&self, next_index: Index) {
  39. let mut internal = self.internal.indexes.lock();
  40. internal.next_index = next_index;
  41. internal.current_step = 0;
  42. }
  43. pub fn record_failure(&self, committed_index: Index) {
  44. let mut internal = self.internal.indexes.lock();
  45. let step = &mut internal.current_step;
  46. if *step < 5 {
  47. *step += 1;
  48. }
  49. let diff = 4 << *step;
  50. let next_index = &mut internal.next_index;
  51. if diff >= *next_index {
  52. *next_index = 1usize;
  53. } else {
  54. *next_index -= diff;
  55. }
  56. if *next_index < committed_index {
  57. *next_index = committed_index;
  58. }
  59. }
  60. pub fn record_success(&self, match_index: Index) {
  61. let mut internal = self.internal.indexes.lock();
  62. internal.next_index = match_index + 1;
  63. internal.current_step = 0;
  64. }
  65. pub fn next_index(&self) -> Index {
  66. self.internal.indexes.lock().next_index
  67. }
  68. }