client.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. use std::future::Future;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::Once;
  4. use std::time::Duration;
  5. use crate::common::{
  6. CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVError,
  7. KVRaftOptions, PutAppendArgs, PutAppendEnum, PutAppendReply,
  8. UniqueIdSequence, ValidReply,
  9. };
  10. use crate::RemoteKvraft;
  11. pub struct Clerk {
  12. init: Once,
  13. inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get<K: AsRef<str>>(&mut self, key: K) -> Option<String> {
  23. let inner = self.init_once();
  24. let key = key.as_ref();
  25. inner
  26. .get(key.to_owned(), KVRaftOptions::default())
  27. .expect("Get should never return error with unlimited retry.")
  28. }
  29. pub fn put<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  30. let inner = self.init_once();
  31. let key = key.as_ref();
  32. let value = value.as_ref();
  33. inner
  34. .put(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  35. .expect("Put should never return error with unlimited retry.")
  36. }
  37. pub fn append<K: AsRef<str>, V: AsRef<str>>(&mut self, key: K, value: V) {
  38. let inner = self.init_once();
  39. let key = key.as_ref();
  40. let value = value.as_ref();
  41. inner
  42. .append(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  43. .expect("Append should never return error with unlimited retry.")
  44. }
  45. pub fn init_once(&mut self) -> &mut ClerkInner {
  46. let (init, inner) = (&self.init, &mut self.inner);
  47. init.call_once(|| inner.commit_sentinel());
  48. &mut self.inner
  49. }
  50. }
  51. pub struct ClerkInner {
  52. servers: Vec<Box<dyn RemoteKvraft>>,
  53. last_server_index: AtomicUsize,
  54. unique_id: UniqueIdSequence,
  55. executor: tokio::runtime::Runtime,
  56. }
  57. impl ClerkInner {
  58. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  59. let servers = servers
  60. .into_iter()
  61. .map(|s| Box::new(s) as Box<dyn RemoteKvraft>)
  62. .collect();
  63. Self {
  64. servers,
  65. last_server_index: AtomicUsize::new(0),
  66. unique_id: UniqueIdSequence::new(),
  67. executor: tokio::runtime::Builder::new_current_thread()
  68. .enable_time()
  69. .build()
  70. .expect("Creating thread pool should not fail"),
  71. }
  72. }
  73. fn commit_sentinel(&mut self) {
  74. loop {
  75. let args = CommitSentinelArgs {
  76. unique_id: self.unique_id.zero(),
  77. };
  78. let reply: Option<CommitSentinelReply> = self.retry_rpc(
  79. |remote, args| remote.commit_sentinel(args),
  80. args,
  81. None,
  82. );
  83. if let Some(reply) = reply {
  84. match reply.result {
  85. Ok(_) => {
  86. // Discard the used unique_id.
  87. self.unique_id.inc();
  88. break;
  89. }
  90. Err(KVError::Expired) | Err(KVError::Conflict) => {
  91. // The client ID happens to be re-used. The request does
  92. // not fail as "Duplicate", because another client has
  93. // committed more than just the sentinel.
  94. self.unique_id = UniqueIdSequence::new();
  95. }
  96. Err(e) => {
  97. panic!(
  98. "Unexpected error with indefinite retry: {:?}",
  99. e
  100. );
  101. }
  102. };
  103. };
  104. }
  105. }
  106. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  107. pub fn retry_rpc<'a, Func, Fut, Args, Reply>(
  108. &'a mut self,
  109. mut future_func: Func,
  110. args: Args,
  111. max_retry: Option<usize>,
  112. ) -> Option<Reply>
  113. where
  114. Args: Clone,
  115. Reply: ValidReply,
  116. Fut: Future<Output = std::io::Result<Reply>> + Send + 'a,
  117. Func: FnMut(&'a dyn RemoteKvraft, Args) -> Fut,
  118. {
  119. let max_retry =
  120. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  121. let mut index = self.last_server_index.load(Ordering::Relaxed);
  122. for _ in 0..max_retry {
  123. let client = &self.servers[index];
  124. let rpc_response = self.executor.block_on(async {
  125. tokio::time::timeout(
  126. Self::DEFAULT_TIMEOUT,
  127. future_func(client.as_ref(), args.clone()),
  128. )
  129. .await
  130. });
  131. let reply = match rpc_response {
  132. Ok(reply) => reply,
  133. Err(e) => Err(e.into()),
  134. };
  135. if let Ok(ret) = reply {
  136. if ret.is_reply_valid() {
  137. self.last_server_index.store(index, Ordering::Relaxed);
  138. return Some(ret);
  139. }
  140. }
  141. index += 1;
  142. index %= self.servers.len();
  143. }
  144. None
  145. }
  146. /// This function returns None when
  147. /// 1. No KVServer can be reached, or
  148. /// 2. No KVServer claimed to be the leader, or
  149. ///
  150. /// In both cases the request can be retried.
  151. pub fn get(
  152. &mut self,
  153. key: String,
  154. options: KVRaftOptions,
  155. ) -> Option<Option<String>> {
  156. let args = GetArgs { key };
  157. let reply: GetReply = self.retry_rpc(
  158. |remote, args| remote.get(args),
  159. args,
  160. options.max_retry,
  161. )?;
  162. match reply.result {
  163. Ok(val) => Some(val),
  164. Err(KVError::Expired) => panic!("Get requests do not expire."),
  165. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  166. _ => None,
  167. }
  168. }
  169. /// This function returns None when
  170. /// 1. No KVServer can be reached, or
  171. /// 2. No KVServer claimed to be the leader.
  172. ///
  173. /// Some(()) is returned if the request has been committed previously, under
  174. /// the assumption is that two different requests with the same unique_id
  175. /// must be identical.
  176. ///
  177. /// This function do not expect a Conflict request with the same unique_id.
  178. fn put_append(
  179. &mut self,
  180. key: String,
  181. value: String,
  182. op: PutAppendEnum,
  183. options: KVRaftOptions,
  184. ) -> Option<()> {
  185. let args = PutAppendArgs {
  186. key,
  187. value,
  188. op,
  189. unique_id: self.unique_id.inc(),
  190. };
  191. let reply: PutAppendReply = self.retry_rpc(
  192. |remote, args| remote.put_append(args),
  193. args,
  194. options.max_retry,
  195. )?;
  196. match reply.result {
  197. Ok(val) => Some(val),
  198. Err(KVError::Expired) => Some(()),
  199. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  200. _ => None,
  201. }
  202. }
  203. pub fn put(
  204. &mut self,
  205. key: String,
  206. value: String,
  207. options: KVRaftOptions,
  208. ) -> Option<()> {
  209. self.put_append(key, value, PutAppendEnum::Put, options)
  210. }
  211. pub fn append(
  212. &mut self,
  213. key: String,
  214. value: String,
  215. options: KVRaftOptions,
  216. ) -> Option<()> {
  217. self.put_append(key, value, PutAppendEnum::Append, options)
  218. }
  219. }