utils.rs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use std::future::Future;
  2. use std::time::Duration;
  3. pub async fn retry_rpc<'a, Func, Fut, T>(
  4. max_retry: usize,
  5. deadline: Duration,
  6. mut task_gen: Func,
  7. ) -> std::io::Result<T>
  8. where
  9. Fut: Future<Output = std::io::Result<T>> + Send + 'a,
  10. Func: FnMut(usize) -> Fut,
  11. {
  12. for i in 0..max_retry {
  13. if i != 0 {
  14. tokio::time::sleep(Duration::from_millis((1 << i) * 10)).await;
  15. }
  16. // Not timed-out.
  17. #[allow(clippy::collapsible_match)]
  18. if let Ok(reply) = tokio::time::timeout(deadline, task_gen(i)).await {
  19. // And no error
  20. if let Ok(reply) = reply {
  21. return Ok(reply);
  22. }
  23. }
  24. }
  25. Err(std::io::Error::new(
  26. std::io::ErrorKind::TimedOut,
  27. format!("Timed out after {} retries", max_retry),
  28. ))
  29. }
  30. pub const RPC_DEADLINE: Duration = Duration::from_secs(2);
  31. #[cfg(feature = "integration-test")]
  32. pub mod integration_test {
  33. use crate::{
  34. AppendEntriesArgs, AppendEntriesReply, Peer, RequestVoteArgs,
  35. RequestVoteReply, Term,
  36. };
  37. pub fn make_request_vote_args(
  38. term: Term,
  39. peer_id: usize,
  40. last_log_index: usize,
  41. last_log_term: Term,
  42. ) -> RequestVoteArgs {
  43. RequestVoteArgs {
  44. term,
  45. candidate_id: Peer(peer_id),
  46. last_log_index,
  47. last_log_term,
  48. }
  49. }
  50. pub fn make_append_entries_args<Command>(
  51. term: Term,
  52. leader_id: usize,
  53. prev_log_index: usize,
  54. prev_log_term: Term,
  55. leader_commit: usize,
  56. ) -> AppendEntriesArgs<Command> {
  57. AppendEntriesArgs {
  58. term,
  59. leader_id: Peer(leader_id),
  60. prev_log_index,
  61. prev_log_term,
  62. entries: vec![],
  63. leader_commit,
  64. }
  65. }
  66. pub fn unpack_request_vote_reply(reply: RequestVoteReply) -> (Term, bool) {
  67. (reply.term, reply.vote_granted)
  68. }
  69. pub fn unpack_append_entries_reply(
  70. reply: AppendEntriesReply,
  71. ) -> (Term, bool) {
  72. (reply.term, reply.success)
  73. }
  74. }
  75. /// A `std::sync::mpsc::Sender` that is also `Sync`.
  76. ///
  77. /// The builtin `Sender` is not sync, because it uses internal mutability to
  78. /// implement an optimization for non-shared one-shot sending. The queue that
  79. /// backs the sender initially accepts only one item from a single producer.
  80. /// If the sender is cloned, the internal queue turns into a multi-producer
  81. /// multi-shot queue. After that, the internal mutability is never invoked
  82. /// again for the sender. The `Sender` structure becomes essentially immutable
  83. /// and thus, `Sync`.
  84. ///
  85. /// This optimization, and the internal mutability is meaningless for the
  86. /// purpose of this crate. `SharedSender` forces the transition into a shared
  87. /// queue, and declares itself `Sync`.
  88. ///
  89. /// Note that the same reasoning does not apply to the `Receiver`. There are
  90. /// more levels of mutability in the `Receiver`.
  91. #[derive(Clone, Debug)]
  92. pub struct SharedSender<T>(std::sync::mpsc::Sender<T>);
  93. unsafe impl<T> Sync for SharedSender<T> where T: Sync {}
  94. // A better way to implement this might be the following.
  95. //
  96. // unsafe impl<T> Sync for SharedSender<T> where
  97. // std::sync::mpsc::Flavor<T>::Shared: Sync {}
  98. impl<T> SharedSender<T> {
  99. /// Create a shared sender.
  100. pub fn new(inner: std::sync::mpsc::Sender<T>) -> SharedSender<T> {
  101. // Force the transition to a shared queue in Sender.
  102. let _clone = inner.clone();
  103. SharedSender(inner)
  104. }
  105. /// A proxy to `std::syc::mpsc::Sender::send()`.
  106. pub fn send(&self, t: T) -> Result<(), std::sync::mpsc::SendError<T>> {
  107. self.0.send(t)
  108. }
  109. }
  110. impl<T> From<std::sync::mpsc::Sender<T>> for SharedSender<T> {
  111. fn from(inner: std::sync::mpsc::Sender<T>) -> Self {
  112. Self::new(inner)
  113. }
  114. }
  115. impl<T> From<SharedSender<T>> for std::sync::mpsc::Sender<T> {
  116. fn from(this: SharedSender<T>) -> Self {
  117. this.0
  118. }
  119. }
  120. lazy_static::lazy_static! {
  121. static ref THREAD_POOLS: parking_lot::Mutex<std::collections::HashMap<u64, tokio::runtime::Runtime>> =
  122. parking_lot::Mutex::new(std::collections::HashMap::new());
  123. }
  124. #[derive(Clone)]
  125. pub(crate) struct ThreadPoolHolder {
  126. id: u64,
  127. handle: tokio::runtime::Handle,
  128. }
  129. impl ThreadPoolHolder {
  130. pub fn new(runtime: tokio::runtime::Runtime) -> Self {
  131. let handle = runtime.handle().clone();
  132. loop {
  133. let id: u64 = rand::random();
  134. if let std::collections::hash_map::Entry::Vacant(v) =
  135. THREAD_POOLS.lock().entry(id)
  136. {
  137. v.insert(runtime);
  138. break Self { id, handle };
  139. }
  140. }
  141. }
  142. pub fn take(self) -> Option<tokio::runtime::Runtime> {
  143. THREAD_POOLS.lock().remove(&self.id)
  144. }
  145. }
  146. impl std::ops::Deref for ThreadPoolHolder {
  147. type Target = tokio::runtime::Handle;
  148. fn deref(&self) -> &Self::Target {
  149. &self.handle
  150. }
  151. }