client.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. use std::future::Future;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::Once;
  4. use std::time::Duration;
  5. use crate::common::{
  6. CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVError,
  7. KVRaftOptions, PutAppendArgs, PutAppendEnum, PutAppendReply,
  8. UniqueIdSequence, ValidReply,
  9. };
  10. use crate::RemoteKvraft;
  11. pub struct Clerk {
  12. init: Once,
  13. inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  23. let inner = self.init_once();
  24. let key = key.as_ref();
  25. loop {
  26. if let Some(val) = inner.get(key.to_owned(), Default::default()) {
  27. return val;
  28. }
  29. }
  30. }
  31. pub fn put<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  32. let inner = self.init_once();
  33. let key = key.as_ref();
  34. let value = value.as_ref();
  35. inner
  36. .put(key.to_owned(), value.to_owned(), Default::default())
  37. .expect("Put should never return error with unlimited retry.")
  38. }
  39. pub fn append<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  40. let inner = self.init_once();
  41. let key = key.as_ref();
  42. let value = value.as_ref();
  43. inner
  44. .append(key.to_owned(), value.to_owned(), Default::default())
  45. .expect("Append should never return error with unlimited retry.")
  46. }
  47. pub fn init_once(&mut self) -> &mut ClerkInner {
  48. let (init, inner) = (&self.init, &mut self.inner);
  49. init.call_once(|| inner.commit_sentinel());
  50. &mut self.inner
  51. }
  52. }
  53. pub struct ClerkInner {
  54. servers: Vec<Box<dyn RemoteKvraft>>,
  55. last_server_index: AtomicUsize,
  56. unique_id: UniqueIdSequence,
  57. executor: tokio::runtime::Runtime,
  58. }
  59. impl ClerkInner {
  60. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  61. let servers = servers
  62. .into_iter()
  63. .map(|s| Box::new(s) as Box<dyn RemoteKvraft>)
  64. .collect();
  65. Self {
  66. servers,
  67. last_server_index: AtomicUsize::new(0),
  68. unique_id: UniqueIdSequence::new(),
  69. executor: tokio::runtime::Builder::new_current_thread()
  70. .enable_time()
  71. .build()
  72. .expect("Creating thread pool should not fail"),
  73. }
  74. }
  75. fn commit_sentinel(&mut self) {
  76. loop {
  77. let args = CommitSentinelArgs {
  78. unique_id: self.unique_id.zero(),
  79. };
  80. let reply: Option<CommitSentinelReply> = self.retry_rpc(
  81. |remote, args| remote.commit_sentinel(args),
  82. args,
  83. Some(1),
  84. );
  85. if let Some(reply) = reply {
  86. match reply.result {
  87. Ok(_) => {
  88. // Discard the used unique_id.
  89. self.unique_id.inc();
  90. break;
  91. }
  92. Err(KVError::Expired) | Err(KVError::Conflict) => {
  93. // The client ID happens to be re-used. The request does
  94. // not fail as "Duplicate", because another client has
  95. // committed more than just the sentinel.
  96. self.unique_id = UniqueIdSequence::new();
  97. }
  98. Err(_) => {}
  99. };
  100. };
  101. }
  102. }
  103. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  104. pub fn retry_rpc<'a, Func, Fut, Args, Reply>(
  105. &'a mut self,
  106. mut future_func: Func,
  107. args: Args,
  108. max_retry: Option<usize>,
  109. ) -> Option<Reply>
  110. where
  111. Args: Clone,
  112. Reply: ValidReply,
  113. Fut: Future<Output = std::io::Result<Reply>> + Send + 'a,
  114. Func: FnMut(&'a dyn RemoteKvraft, Args) -> Fut,
  115. {
  116. let max_retry =
  117. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  118. let mut index = self.last_server_index.load(Ordering::Relaxed);
  119. for _ in 0..max_retry {
  120. let client = &self.servers[index];
  121. let rpc_response = self.executor.block_on(async {
  122. tokio::time::timeout(
  123. Self::DEFAULT_TIMEOUT,
  124. future_func(client.as_ref(), args.clone()),
  125. )
  126. .await
  127. });
  128. let reply = match rpc_response {
  129. Ok(reply) => reply,
  130. Err(e) => Err(e.into()),
  131. };
  132. if let Ok(ret) = reply {
  133. if ret.is_reply_valid() {
  134. self.last_server_index.store(index, Ordering::Relaxed);
  135. return Some(ret);
  136. }
  137. }
  138. index += 1;
  139. index %= self.servers.len();
  140. }
  141. None
  142. }
  143. /// This function returns None when
  144. /// 1. No KVServer can be reached, or
  145. /// 2. No KVServer claimed to be the leader, or
  146. /// 3. When the KVServer committed the request but it was not passed
  147. /// back to the clerk. We must retry with a new unique_id.
  148. ///
  149. /// In all 3 cases the request can be retried.
  150. ///
  151. /// This function do not expect a Conflict request with the same unique_id.
  152. pub fn get(
  153. &mut self,
  154. key: String,
  155. options: KVRaftOptions,
  156. ) -> Option<Option<String>> {
  157. let args = GetArgs {
  158. key,
  159. unique_id: self.unique_id.inc(),
  160. };
  161. let reply: GetReply = self.retry_rpc(
  162. |remote, args| remote.get(args),
  163. args,
  164. options.max_retry,
  165. )?;
  166. match reply.result {
  167. Ok(val) => Some(val),
  168. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  169. _ => None,
  170. }
  171. }
  172. /// This function returns None when
  173. /// 1. No KVServer can be reached, or
  174. /// 2. No KVServer claimed to be the leader.
  175. ///
  176. /// Some(()) is returned if the request has been committed previously, under
  177. /// the assumption is that two different requests with the same unique_id
  178. /// must be identical.
  179. ///
  180. /// This function do not expect a Conflict request with the same unique_id.
  181. fn put_append(
  182. &mut self,
  183. key: String,
  184. value: String,
  185. op: PutAppendEnum,
  186. options: KVRaftOptions,
  187. ) -> Option<()> {
  188. let args = PutAppendArgs {
  189. key,
  190. value,
  191. op,
  192. unique_id: self.unique_id.inc(),
  193. };
  194. let reply: PutAppendReply = self.retry_rpc(
  195. |remote, args| remote.put_append(args),
  196. args,
  197. options.max_retry,
  198. )?;
  199. match reply.result {
  200. Ok(val) => Some(val),
  201. Err(KVError::Expired) => Some(()),
  202. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  203. _ => None,
  204. }
  205. }
  206. pub fn put(
  207. &mut self,
  208. key: String,
  209. value: String,
  210. options: KVRaftOptions,
  211. ) -> Option<()> {
  212. self.put_append(key, value, PutAppendEnum::Put, options)
  213. }
  214. pub fn append(
  215. &mut self,
  216. key: String,
  217. value: String,
  218. options: KVRaftOptions,
  219. ) -> Option<()> {
  220. self.put_append(key, value, PutAppendEnum::Append, options)
  221. }
  222. }