utils.rs 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. use std::future::Future;
  2. use std::time::Duration;
  3. pub async fn retry_rpc<'a, Func, Fut, T>(
  4. max_retry: usize,
  5. deadline: Duration,
  6. mut task_gen: Func,
  7. ) -> std::io::Result<T>
  8. where
  9. Fut: Future<Output = std::io::Result<T>> + Send + 'a,
  10. Func: FnMut(usize) -> Fut,
  11. {
  12. for i in 0..max_retry {
  13. if i != 0 {
  14. tokio::time::sleep(Duration::from_millis((1 << i) * 10)).await;
  15. }
  16. // Not timed-out.
  17. #[allow(clippy::collapsible_match)]
  18. if let Ok(reply) = tokio::time::timeout(deadline, task_gen(i)).await {
  19. // And no error
  20. if let Ok(reply) = reply {
  21. return Ok(reply);
  22. }
  23. }
  24. }
  25. Err(std::io::Error::new(
  26. std::io::ErrorKind::TimedOut,
  27. format!("Timed out after {} retries", max_retry),
  28. ))
  29. }
  30. pub const RPC_DEADLINE: Duration = Duration::from_secs(2);
  31. #[cfg(feature = "integration-test")]
  32. pub mod integration_test {
  33. use crate::{
  34. AppendEntriesArgs, AppendEntriesReply, Peer, RequestVoteArgs,
  35. RequestVoteReply, Term,
  36. };
  37. pub fn make_request_vote_args(
  38. term: Term,
  39. peer_id: usize,
  40. last_log_index: usize,
  41. last_log_term: Term,
  42. ) -> RequestVoteArgs {
  43. RequestVoteArgs {
  44. term,
  45. candidate_id: Peer(peer_id),
  46. last_log_index,
  47. last_log_term,
  48. }
  49. }
  50. pub fn make_append_entries_args<Command>(
  51. term: Term,
  52. leader_id: usize,
  53. prev_log_index: usize,
  54. prev_log_term: Term,
  55. leader_commit: usize,
  56. ) -> AppendEntriesArgs<Command> {
  57. AppendEntriesArgs {
  58. term,
  59. leader_id: Peer(leader_id),
  60. prev_log_index,
  61. prev_log_term,
  62. entries: vec![],
  63. leader_commit,
  64. }
  65. }
  66. pub fn unpack_request_vote_reply(reply: RequestVoteReply) -> (Term, bool) {
  67. (reply.term, reply.vote_granted)
  68. }
  69. pub fn unpack_append_entries_reply(
  70. reply: AppendEntriesReply,
  71. ) -> (Term, bool) {
  72. (reply.term, reply.success)
  73. }
  74. }