client.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. use std::future::Future;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::Once;
  4. use std::time::Duration;
  5. use crate::common::{
  6. CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVError,
  7. KVRaftOptions, PutAppendArgs, PutAppendEnum, PutAppendReply,
  8. UniqueIdSequence, ValidReply,
  9. };
  10. use crate::RemoteKvraft;
  11. pub struct Clerk {
  12. init: Once,
  13. inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  23. let inner = self.init_once();
  24. let key = key.as_ref();
  25. inner
  26. .get(key.to_owned(), KVRaftOptions::default())
  27. .expect("Get should never return error with unlimited retry.")
  28. }
  29. pub fn put<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  30. let inner = self.init_once();
  31. let key = key.as_ref();
  32. let value = value.as_ref();
  33. inner
  34. .put(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  35. .expect("Put should never return error with unlimited retry.")
  36. }
  37. pub fn append<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  38. let inner = self.init_once();
  39. let key = key.as_ref();
  40. let value = value.as_ref();
  41. inner
  42. .append(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  43. .expect("Append should never return error with unlimited retry.")
  44. }
  45. pub fn init_once(&mut self) -> &mut ClerkInner {
  46. let (init, inner) = (&self.init, &mut self.inner);
  47. init.call_once(|| inner.commit_sentinel());
  48. &mut self.inner
  49. }
  50. }
  51. pub struct ClerkInner {
  52. servers: Vec<Box<dyn RemoteKvraft>>,
  53. last_server_index: AtomicUsize,
  54. unique_id: UniqueIdSequence,
  55. executor: tokio::runtime::Runtime,
  56. }
  57. impl ClerkInner {
  58. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  59. let servers = servers
  60. .into_iter()
  61. .map(|s| Box::new(s) as Box<dyn RemoteKvraft>)
  62. .collect();
  63. Self {
  64. servers,
  65. last_server_index: AtomicUsize::new(0),
  66. unique_id: UniqueIdSequence::new(),
  67. executor: tokio::runtime::Builder::new_current_thread()
  68. .enable_time()
  69. .build()
  70. .expect("Creating thread pool should not fail"),
  71. }
  72. }
  73. fn commit_sentinel(&mut self) {
  74. loop {
  75. let args = CommitSentinelArgs {
  76. unique_id: self.unique_id.zero(),
  77. };
  78. let reply: Option<CommitSentinelReply> = self.retry_rpc(
  79. |remote, args| remote.commit_sentinel(args),
  80. args,
  81. None,
  82. );
  83. let Some(reply) = reply else { continue };
  84. match reply.result {
  85. Ok(_) => {
  86. // Discard the used unique_id.
  87. self.unique_id.inc();
  88. break;
  89. }
  90. Err(KVError::Expired) | Err(KVError::Conflict) => {
  91. // The client ID happens to be re-used. The request does
  92. // not fail as "Duplicate", because another client has
  93. // committed more than just the sentinel.
  94. self.unique_id = UniqueIdSequence::new();
  95. }
  96. Err(e) => {
  97. panic!("Unexpected error with indefinite retry: {e:?}")
  98. }
  99. };
  100. }
  101. }
  102. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  103. pub fn retry_rpc<'a, Func, Fut, Args, Reply>(
  104. &'a mut self,
  105. mut future_func: Func,
  106. args: Args,
  107. max_retry: Option<usize>,
  108. ) -> Option<Reply>
  109. where
  110. Args: Clone,
  111. Reply: ValidReply,
  112. Fut: Future<Output = std::io::Result<Reply>> + Send + 'a,
  113. Func: FnMut(&'a dyn RemoteKvraft, Args) -> Fut,
  114. {
  115. let max_retry =
  116. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  117. let mut index = self.last_server_index.load(Ordering::Relaxed);
  118. for _ in 0..max_retry {
  119. let client = &self.servers[index];
  120. let rpc_response = self.executor.block_on(async {
  121. tokio::time::timeout(
  122. Self::DEFAULT_TIMEOUT,
  123. future_func(client.as_ref(), args.clone()),
  124. )
  125. .await
  126. });
  127. let reply = match rpc_response {
  128. Ok(reply) => reply,
  129. Err(e) => Err(e.into()),
  130. };
  131. if let Ok(ret) = reply {
  132. if ret.is_reply_valid() {
  133. self.last_server_index.store(index, Ordering::Relaxed);
  134. return Some(ret);
  135. }
  136. }
  137. index += 1;
  138. index %= self.servers.len();
  139. }
  140. None
  141. }
  142. /// This function returns None when
  143. /// 1. No KVServer can be reached, or
  144. /// 2. No KVServer claimed to be the leader, or
  145. ///
  146. /// In both cases the request can be retried.
  147. pub fn get(
  148. &mut self,
  149. key: String,
  150. options: KVRaftOptions,
  151. ) -> Option<Option<String>> {
  152. let args = GetArgs { key };
  153. let reply: GetReply = self.retry_rpc(
  154. |remote, args| remote.get(args),
  155. args,
  156. options.max_retry,
  157. )?;
  158. match reply.result {
  159. Ok(val) => Some(val),
  160. Err(KVError::Expired) => panic!("Get requests do not expire."),
  161. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  162. _ => None,
  163. }
  164. }
  165. /// This function returns None when
  166. /// 1. No KVServer can be reached, or
  167. /// 2. No KVServer claimed to be the leader.
  168. ///
  169. /// Some(()) is returned if the request has been committed previously, under
  170. /// the assumption is that two different requests with the same unique_id
  171. /// must be identical.
  172. ///
  173. /// This function do not expect a Conflict request with the same unique_id.
  174. fn put_append(
  175. &mut self,
  176. key: String,
  177. value: String,
  178. op: PutAppendEnum,
  179. options: KVRaftOptions,
  180. ) -> Option<()> {
  181. let args = PutAppendArgs {
  182. key,
  183. value,
  184. op,
  185. unique_id: self.unique_id.inc(),
  186. };
  187. let reply: PutAppendReply = self.retry_rpc(
  188. |remote, args| remote.put_append(args),
  189. args,
  190. options.max_retry,
  191. )?;
  192. match reply.result {
  193. Ok(val) => Some(val),
  194. Err(KVError::Expired) => Some(()),
  195. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  196. _ => None,
  197. }
  198. }
  199. pub fn put(
  200. &mut self,
  201. key: String,
  202. value: String,
  203. options: KVRaftOptions,
  204. ) -> Option<()> {
  205. self.put_append(key, value, PutAppendEnum::Put, options)
  206. }
  207. pub fn append(
  208. &mut self,
  209. key: String,
  210. value: String,
  211. options: KVRaftOptions,
  212. ) -> Option<()> {
  213. self.put_append(key, value, PutAppendEnum::Append, options)
  214. }
  215. }