client.rs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Once;
  3. use std::time::Duration;
  4. use labrpc::{Client, RequestMessage};
  5. use serde::de::DeserializeOwned;
  6. use serde::Serialize;
  7. use crate::common::{
  8. GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
  9. PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
  10. };
  11. use crate::common::{KVError, ValidReply};
  12. pub struct Clerk {
  13. init: Once,
  14. inner: ClerkInner,
  15. }
  16. impl Clerk {
  17. pub fn new(servers: Vec<Client>) -> Self {
  18. Self {
  19. init: Once::new(),
  20. inner: ClerkInner::new(servers),
  21. }
  22. }
  23. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  24. let inner = self.init_once();
  25. let key = key.as_ref();
  26. loop {
  27. match inner.get(key.to_owned(), Default::default()) {
  28. Some(val) => return val,
  29. None => {}
  30. }
  31. }
  32. }
  33. pub fn put<K: AsRef<str>, V: AsRef<str>>(
  34. &mut self,
  35. key: K,
  36. value: V,
  37. ) -> Option<()> {
  38. let inner = self.init_once();
  39. let key = key.as_ref();
  40. let value = value.as_ref();
  41. inner.put(key.to_owned(), value.to_owned(), Default::default())
  42. }
  43. pub fn append<K: AsRef<str>, V: AsRef<str>>(
  44. &mut self,
  45. key: K,
  46. value: V,
  47. ) -> Option<()> {
  48. let inner = self.init_once();
  49. let key = key.as_ref();
  50. let value = value.as_ref();
  51. inner.append(key.to_owned(), value.to_owned(), Default::default())
  52. }
  53. pub fn init_once(&mut self) -> &mut ClerkInner {
  54. let (init, inner) = (&self.init, &mut self.inner);
  55. init.call_once(|| inner.commit_sentinel());
  56. &mut self.inner
  57. }
  58. }
  59. pub struct ClerkInner {
  60. servers: Vec<Client>,
  61. last_server_index: AtomicUsize,
  62. unique_id: UniqueIdSequence,
  63. executor: tokio::runtime::Runtime,
  64. }
  65. impl ClerkInner {
  66. pub fn new(servers: Vec<Client>) -> Self {
  67. Self {
  68. servers,
  69. last_server_index: AtomicUsize::new(0),
  70. unique_id: UniqueIdSequence::new(),
  71. executor: tokio::runtime::Builder::new_current_thread()
  72. .enable_time()
  73. .build()
  74. .expect("Creating thread pool should not fail"),
  75. }
  76. }
  77. fn commit_sentinel(&mut self) {
  78. loop {
  79. let args = GetArgs {
  80. key: "".to_string(),
  81. unique_id: self.unique_id.zero(),
  82. };
  83. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  84. if let Some(reply) = reply {
  85. match reply.result {
  86. Ok(_) => {
  87. if !reply.is_retry {
  88. // Discard the used unique_id.
  89. self.unique_id.inc();
  90. break;
  91. } else {
  92. // The RPC was successful, but the server has had an
  93. // exact same entry, which means someone else has taken
  94. // that clerk_id.
  95. self.unique_id = UniqueIdSequence::new();
  96. }
  97. }
  98. Err(KVError::Expired) | Err(KVError::Conflict) => {
  99. // The client ID happens to be re-used. The request does
  100. // not fail as "Duplicate", because another client has
  101. // committed more than just the sentinel.
  102. self.unique_id = UniqueIdSequence::new();
  103. }
  104. Err(_) => {}
  105. };
  106. };
  107. }
  108. }
  109. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  110. fn call_rpc<M, A, R>(
  111. &mut self,
  112. method: M,
  113. args: A,
  114. max_retry: Option<usize>,
  115. ) -> Option<R>
  116. where
  117. M: AsRef<str>,
  118. A: Serialize,
  119. R: DeserializeOwned + ValidReply,
  120. {
  121. let method = method.as_ref().to_owned();
  122. let data = RequestMessage::from(
  123. bincode::serialize(&args)
  124. .expect("Serialization of requests should not fail"),
  125. );
  126. let max_retry =
  127. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  128. let mut index = self.last_server_index.load(Ordering::Relaxed);
  129. for _ in 0..max_retry {
  130. let client = &self.servers[index];
  131. let rpc_response = self.executor.block_on(async {
  132. tokio::time::timeout(
  133. Self::DEFAULT_TIMEOUT,
  134. client.call_rpc(method.clone(), data.clone()),
  135. )
  136. .await
  137. });
  138. let reply = match rpc_response {
  139. Ok(reply) => reply,
  140. Err(e) => Err(e.into()),
  141. };
  142. if let Ok(reply) = reply {
  143. let ret: R = bincode::deserialize(reply.as_ref())
  144. .expect("Deserialization of reply should not fail");
  145. if ret.is_reply_valid() {
  146. self.last_server_index.store(index, Ordering::Relaxed);
  147. return Some(ret);
  148. }
  149. }
  150. index += 1;
  151. index %= self.servers.len();
  152. }
  153. None
  154. }
  155. /// This function returns None when
  156. /// 1. No KVServer can be reached, or
  157. /// 2. No KVServer claimed to be the leader, or
  158. /// 3. When the KVServer committed the request but it was not passed
  159. /// back to the clerk. We must retry with a new unique_id.
  160. ///
  161. /// In all 3 cases the request can be retried.
  162. ///
  163. /// This function do not expect a Conflict request with the same unique_id.
  164. pub fn get(
  165. &mut self,
  166. key: String,
  167. options: KVRaftOptions,
  168. ) -> Option<Option<String>> {
  169. let args = GetArgs {
  170. key,
  171. unique_id: self.unique_id.inc(),
  172. };
  173. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  174. match reply.result {
  175. Ok(val) => Some(val),
  176. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  177. _ => None,
  178. }
  179. }
  180. fn put_append(
  181. &mut self,
  182. key: String,
  183. value: String,
  184. op: PutAppendEnum,
  185. options: KVRaftOptions,
  186. ) -> Option<()> {
  187. let args = PutAppendArgs {
  188. key,
  189. value,
  190. op,
  191. unique_id: self.unique_id.inc(),
  192. };
  193. let reply: PutAppendReply =
  194. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  195. assert!(reply.result.is_ok());
  196. Some(())
  197. }
  198. pub fn put(
  199. &mut self,
  200. key: String,
  201. value: String,
  202. options: KVRaftOptions,
  203. ) -> Option<()> {
  204. self.put_append(key, value, PutAppendEnum::Put, options)
  205. }
  206. pub fn append(
  207. &mut self,
  208. key: String,
  209. value: String,
  210. options: KVRaftOptions,
  211. ) -> Option<()> {
  212. self.put_append(key, value, PutAppendEnum::Append, options)
  213. }
  214. }