client.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Once;
  3. use labrpc::{Client, RequestMessage};
  4. use serde::de::DeserializeOwned;
  5. use serde::Serialize;
  6. use crate::common::{
  7. GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
  8. PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
  9. };
  10. use crate::common::{KVError, ValidReply};
  11. pub struct Clerk {
  12. init: Once,
  13. inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<Client>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  23. let inner = self.init_once();
  24. let key = key.as_ref();
  25. loop {
  26. match inner.get(key.to_owned(), Default::default()) {
  27. Some(val) => return val,
  28. None => {}
  29. }
  30. }
  31. }
  32. pub fn put<K: AsRef<str>, V: AsRef<str>>(
  33. &mut self,
  34. key: K,
  35. value: V,
  36. ) -> Option<()> {
  37. let inner = self.init_once();
  38. let key = key.as_ref();
  39. let value = value.as_ref();
  40. inner.put(key.to_owned(), value.to_owned(), Default::default())
  41. }
  42. pub fn append<K: AsRef<str>, V: AsRef<str>>(
  43. &mut self,
  44. key: K,
  45. value: V,
  46. ) -> Option<()> {
  47. let inner = self.init_once();
  48. let key = key.as_ref();
  49. let value = value.as_ref();
  50. inner.append(key.to_owned(), value.to_owned(), Default::default())
  51. }
  52. pub fn init_once(&mut self) -> &mut ClerkInner {
  53. let (init, inner) = (&self.init, &mut self.inner);
  54. init.call_once(|| inner.commit_sentinel());
  55. &mut self.inner
  56. }
  57. }
  58. pub struct ClerkInner {
  59. servers: Vec<Client>,
  60. last_server_index: AtomicUsize,
  61. unique_id: UniqueIdSequence,
  62. executor: tokio::runtime::Runtime,
  63. }
  64. impl ClerkInner {
  65. pub fn new(servers: Vec<Client>) -> Self {
  66. Self {
  67. servers,
  68. last_server_index: AtomicUsize::new(0),
  69. unique_id: UniqueIdSequence::new(),
  70. executor: tokio::runtime::Builder::new_multi_thread()
  71. .thread_name("kvraft-clerk")
  72. .worker_threads(1)
  73. .build()
  74. .expect("Creating thread pool should not fail"),
  75. }
  76. }
  77. fn commit_sentinel(&mut self) {
  78. loop {
  79. let args = GetArgs {
  80. key: "".to_string(),
  81. unique_id: self.unique_id.zero(),
  82. };
  83. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  84. if let Some(reply) = reply {
  85. match reply.result {
  86. Ok(_) => {
  87. if !reply.is_retry {
  88. // Discard the used unique_id.
  89. self.unique_id.inc();
  90. break;
  91. } else {
  92. // The RPC was successful, but the server has had an
  93. // exact same entry, which means someone else has taken
  94. // that clerk_id.
  95. self.unique_id = UniqueIdSequence::new();
  96. }
  97. }
  98. Err(KVError::Expired) | Err(KVError::Conflict) => {
  99. // The client ID happens to be re-used. The request does
  100. // not fail as "Duplicate", because another client has
  101. // committed more than just the sentinel.
  102. self.unique_id = UniqueIdSequence::new();
  103. }
  104. Err(_) => {}
  105. };
  106. };
  107. }
  108. }
  109. fn call_rpc<M, A, R>(
  110. &mut self,
  111. method: M,
  112. args: A,
  113. max_retry: Option<usize>,
  114. ) -> Option<R>
  115. where
  116. M: AsRef<str>,
  117. A: Serialize,
  118. R: DeserializeOwned + ValidReply,
  119. {
  120. let method = method.as_ref().to_owned();
  121. let data = RequestMessage::from(
  122. bincode::serialize(&args)
  123. .expect("Serialization of requests should not fail"),
  124. );
  125. let max_retry =
  126. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  127. let mut index = self.last_server_index.load(Ordering::Relaxed);
  128. for _ in 0..max_retry {
  129. let client = &self.servers[index];
  130. let reply = self
  131. .executor
  132. .block_on(client.call_rpc(method.clone(), data.clone()));
  133. if let Ok(reply) = reply {
  134. let ret: R = bincode::deserialize(reply.as_ref())
  135. .expect("Deserialization of reply should not fail");
  136. if ret.is_reply_valid() {
  137. self.last_server_index.store(index, Ordering::Relaxed);
  138. return Some(ret);
  139. }
  140. }
  141. index += 1;
  142. index %= self.servers.len();
  143. }
  144. None
  145. }
  146. /// This function returns None when
  147. /// 1. No KVServer can be reached, or
  148. /// 2. No KVServer claimed to be the leader, or
  149. /// 3. When the KVServer committed the request but it was not passed
  150. /// back to the clerk. We must retry with a new unique_id.
  151. ///
  152. /// In all 3 cases the request can be retried.
  153. ///
  154. /// This function do not expect a Conflict request with the same unique_id.
  155. pub fn get(
  156. &mut self,
  157. key: String,
  158. options: KVRaftOptions,
  159. ) -> Option<Option<String>> {
  160. let args = GetArgs {
  161. key,
  162. unique_id: self.unique_id.inc(),
  163. };
  164. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  165. match reply.result {
  166. Ok(val) => Some(val),
  167. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  168. _ => None,
  169. }
  170. }
  171. fn put_append(
  172. &mut self,
  173. key: String,
  174. value: String,
  175. op: PutAppendEnum,
  176. options: KVRaftOptions,
  177. ) -> Option<()> {
  178. let args = PutAppendArgs {
  179. key,
  180. value,
  181. op,
  182. unique_id: self.unique_id.inc(),
  183. };
  184. let reply: PutAppendReply =
  185. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  186. assert!(reply.result.is_ok());
  187. Some(())
  188. }
  189. pub fn put(
  190. &mut self,
  191. key: String,
  192. value: String,
  193. options: KVRaftOptions,
  194. ) -> Option<()> {
  195. self.put_append(key, value, PutAppendEnum::Put, options)
  196. }
  197. pub fn append(
  198. &mut self,
  199. key: String,
  200. value: String,
  201. options: KVRaftOptions,
  202. ) -> Option<()> {
  203. self.put_append(key, value, PutAppendEnum::Append, options)
  204. }
  205. }