client.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. use super::common::{
  2. GetArgs, GetReply, KVRaftOptions, KvError, PutAppendArgs, PutAppendReply,
  3. UniqueIdSequence, GET, PUT_APPEND,
  4. };
  5. use crate::kvraft::common::PutAppendEnum;
  6. use labrpc::{Client, RequestMessage};
  7. use serde::de::DeserializeOwned;
  8. use serde::Serialize;
  9. use std::sync::atomic::{AtomicUsize, Ordering};
  10. struct Clerk {
  11. servers: Vec<Client>,
  12. last_server_index: AtomicUsize,
  13. unique_id: UniqueIdSequence,
  14. executor: tokio::runtime::Runtime,
  15. }
  16. impl Clerk {
  17. pub fn new(servers: Vec<Client>) -> Self {
  18. Self {
  19. servers,
  20. last_server_index: AtomicUsize::new(0),
  21. unique_id: UniqueIdSequence::new(),
  22. executor: tokio::runtime::Builder::new_multi_thread()
  23. .thread_name("kvraft-clerk")
  24. .worker_threads(1)
  25. .build()
  26. .expect("Creating thread pool should not fail"),
  27. }
  28. }
  29. fn commit_sentinel(&mut self) {
  30. loop {
  31. let args = GetArgs {
  32. key: "".to_string(),
  33. unique_id: self.unique_id.zero(),
  34. };
  35. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  36. if let Some(reply) = reply {
  37. match reply.result {
  38. Ok(_) => {
  39. if !reply.is_retry {
  40. // Discard the used unique_id.
  41. self.unique_id.inc();
  42. break;
  43. } else {
  44. // The RPC was successful, but the server has had an
  45. // exact same entry, which means someone else has taken
  46. // that clerk_id.
  47. self.unique_id = UniqueIdSequence::new();
  48. }
  49. }
  50. Err(_) => {}
  51. };
  52. };
  53. }
  54. }
  55. fn call_rpc<M: AsRef<str>, A: Clone + Serialize, R: DeserializeOwned>(
  56. &mut self,
  57. method: M,
  58. args: A,
  59. max_retry: Option<usize>,
  60. ) -> Option<R> {
  61. let method = method.as_ref().to_owned();
  62. let data = RequestMessage::from(
  63. bincode::serialize(&args)
  64. .expect("Serialization of requests should not fail"),
  65. );
  66. for _ in 0..max_retry.unwrap_or(usize::MAX) {
  67. let index = self.last_server_index.load(Ordering::Relaxed);
  68. let client = &self.servers[index];
  69. let reply = self
  70. .executor
  71. .block_on(client.call_rpc(method.clone(), data.clone()));
  72. if let Ok(reply) = reply {
  73. let ret = bincode::deserialize(reply.as_ref())
  74. .expect("Deserialization of reply should not fail");
  75. self.last_server_index.store(index, Ordering::Relaxed);
  76. return Some(ret);
  77. }
  78. }
  79. None
  80. }
  81. pub fn get(
  82. &mut self,
  83. key: String,
  84. options: KVRaftOptions,
  85. ) -> Option<String> {
  86. let args = GetArgs {
  87. key,
  88. unique_id: self.unique_id.inc(),
  89. };
  90. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  91. match reply.result {
  92. Ok(val) => Some(val),
  93. Err(KvError::NoKey) => Some(Default::default()),
  94. _ => None,
  95. }
  96. }
  97. fn put_append(
  98. &mut self,
  99. key: String,
  100. value: String,
  101. op: PutAppendEnum,
  102. options: KVRaftOptions,
  103. ) -> Option<()> {
  104. let args = PutAppendArgs {
  105. key,
  106. value,
  107. op,
  108. unique_id: self.unique_id.inc(),
  109. };
  110. let reply: PutAppendReply =
  111. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  112. assert!(reply.result.is_ok());
  113. Some(())
  114. }
  115. pub fn put(
  116. &mut self,
  117. key: String,
  118. value: String,
  119. options: KVRaftOptions,
  120. ) -> Option<()> {
  121. self.put_append(key, value, PutAppendEnum::Put, options)
  122. }
  123. pub fn append(
  124. &mut self,
  125. key: String,
  126. value: String,
  127. options: KVRaftOptions,
  128. ) -> Option<()> {
  129. self.put_append(key, value, PutAppendEnum::Append, options)
  130. }
  131. }