client.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. use super::common::{
  2. GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
  4. };
  5. use common::{KVError, ValidReply};
  6. use labrpc::{Client, RequestMessage};
  7. use serde::de::DeserializeOwned;
  8. use serde::Serialize;
  9. use std::sync::atomic::{AtomicUsize, Ordering};
  10. use std::sync::Once;
  11. pub struct Clerk {
  12. init: Once,
  13. pub inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<Client>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  23. let (init, inner) = (&self.init, &mut self.inner);
  24. init.call_once(|| inner.commit_sentinel());
  25. let key = key.as_ref();
  26. loop {
  27. match inner.get(key.to_owned(), Default::default()) {
  28. Some(val) => return val,
  29. None => {}
  30. }
  31. }
  32. }
  33. pub fn put<K: AsRef<str>, V: AsRef<str>>(
  34. &mut self,
  35. key: K,
  36. value: V,
  37. ) -> Option<()> {
  38. let (init, inner) = (&self.init, &mut self.inner);
  39. init.call_once(|| inner.commit_sentinel());
  40. let key = key.as_ref();
  41. let value = value.as_ref();
  42. inner.put(key.to_owned(), value.to_owned(), Default::default())
  43. }
  44. pub fn append<K: AsRef<str>, V: AsRef<str>>(
  45. &mut self,
  46. key: K,
  47. value: V,
  48. ) -> Option<()> {
  49. let (init, inner) = (&self.init, &mut self.inner);
  50. init.call_once(|| inner.commit_sentinel());
  51. let key = key.as_ref();
  52. let value = value.as_ref();
  53. inner.append(key.to_owned(), value.to_owned(), Default::default())
  54. }
  55. }
  56. pub struct ClerkInner {
  57. servers: Vec<Client>,
  58. last_server_index: AtomicUsize,
  59. unique_id: UniqueIdSequence,
  60. executor: tokio::runtime::Runtime,
  61. }
  62. impl ClerkInner {
  63. pub fn new(servers: Vec<Client>) -> Self {
  64. Self {
  65. servers,
  66. last_server_index: AtomicUsize::new(0),
  67. unique_id: UniqueIdSequence::new(),
  68. executor: tokio::runtime::Builder::new_multi_thread()
  69. .thread_name("kvraft-clerk")
  70. .worker_threads(1)
  71. .build()
  72. .expect("Creating thread pool should not fail"),
  73. }
  74. }
  75. fn commit_sentinel(&mut self) {
  76. loop {
  77. let args = GetArgs {
  78. key: "".to_string(),
  79. unique_id: self.unique_id.zero(),
  80. };
  81. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  82. if let Some(reply) = reply {
  83. match reply.result {
  84. Ok(_) => {
  85. if !reply.is_retry {
  86. // Discard the used unique_id.
  87. self.unique_id.inc();
  88. break;
  89. } else {
  90. // The RPC was successful, but the server has had an
  91. // exact same entry, which means someone else has taken
  92. // that clerk_id.
  93. self.unique_id = UniqueIdSequence::new();
  94. }
  95. }
  96. Err(KVError::Expired) | Err(KVError::Conflict) => {
  97. // The client ID happens to be re-used. The request does
  98. // not fail as "Duplicate", because another client has
  99. // committed more than just the sentinel.
  100. self.unique_id = UniqueIdSequence::new();
  101. }
  102. Err(_) => {}
  103. };
  104. };
  105. }
  106. }
  107. fn call_rpc<M, A, R>(
  108. &mut self,
  109. method: M,
  110. args: A,
  111. max_retry: Option<usize>,
  112. ) -> Option<R>
  113. where
  114. M: AsRef<str>,
  115. A: Serialize,
  116. R: DeserializeOwned + ValidReply,
  117. {
  118. let method = method.as_ref().to_owned();
  119. let data = RequestMessage::from(
  120. bincode::serialize(&args)
  121. .expect("Serialization of requests should not fail"),
  122. );
  123. let max_retry =
  124. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  125. let mut index = self.last_server_index.load(Ordering::Relaxed);
  126. for _ in 0..max_retry {
  127. let client = &self.servers[index];
  128. let reply = self
  129. .executor
  130. .block_on(client.call_rpc(method.clone(), data.clone()));
  131. if let Ok(reply) = reply {
  132. let ret: R = bincode::deserialize(reply.as_ref())
  133. .expect("Deserialization of reply should not fail");
  134. if ret.is_reply_valid() {
  135. self.last_server_index.store(index, Ordering::Relaxed);
  136. return Some(ret);
  137. }
  138. }
  139. index += 1;
  140. index %= self.servers.len();
  141. }
  142. None
  143. }
  144. /// This function returns None when
  145. /// 1. No KVServer can be reached, or
  146. /// 2. No KVServer claimed to be the leader, or
  147. /// 3. When the KVServer committed the request but it was not passed
  148. /// back to the clerk. We must retry with a new unique_id.
  149. ///
  150. /// In all 3 cases the request can be retried.
  151. ///
  152. /// This function do not expect a Conflict request with the same unique_id.
  153. pub fn get(
  154. &mut self,
  155. key: String,
  156. options: KVRaftOptions,
  157. ) -> Option<Option<String>> {
  158. let args = GetArgs {
  159. key,
  160. unique_id: self.unique_id.inc(),
  161. };
  162. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  163. match reply.result {
  164. Ok(val) => Some(val),
  165. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  166. _ => None,
  167. }
  168. }
  169. fn put_append(
  170. &mut self,
  171. key: String,
  172. value: String,
  173. op: PutAppendEnum,
  174. options: KVRaftOptions,
  175. ) -> Option<()> {
  176. let args = PutAppendArgs {
  177. key,
  178. value,
  179. op,
  180. unique_id: self.unique_id.inc(),
  181. };
  182. let reply: PutAppendReply =
  183. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  184. assert!(reply.result.is_ok());
  185. Some(())
  186. }
  187. pub fn put(
  188. &mut self,
  189. key: String,
  190. value: String,
  191. options: KVRaftOptions,
  192. ) -> Option<()> {
  193. self.put_append(key, value, PutAppendEnum::Put, options)
  194. }
  195. pub fn append(
  196. &mut self,
  197. key: String,
  198. value: String,
  199. options: KVRaftOptions,
  200. ) -> Option<()> {
  201. self.put_append(key, value, PutAppendEnum::Append, options)
  202. }
  203. }