async_client.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. use std::future::Future;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use tokio::sync::OnceCell;
  4. use crate::common::{
  5. CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, KVError,
  6. KVRaftOptions, PutAppendArgs, PutAppendEnum, PutAppendReply, UniqueId,
  7. UniqueIdSequence, ValidReply,
  8. };
  9. use crate::RemoteKvraft;
  10. pub struct AsyncClerk {
  11. pub inner: AsyncClient,
  12. }
  13. impl AsyncClerk {
  14. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  15. Self {
  16. inner: AsyncClient::new(servers),
  17. }
  18. }
  19. pub async fn get<K: AsRef<str>>(&self, key: K) -> Option<String> {
  20. self.inner
  21. .get(key.as_ref().to_owned(), KVRaftOptions::default())
  22. .await
  23. .expect("Get should never return error with unlimited retry.")
  24. }
  25. pub async fn put<K: AsRef<str>, V: AsRef<str>>(&self, key: K, value: V) {
  26. let key = key.as_ref();
  27. let value = value.as_ref();
  28. self.inner
  29. .put(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  30. .await
  31. .expect("Put should never return error with unlimited retry.")
  32. }
  33. pub async fn append<K: AsRef<str>, V: AsRef<str>>(&self, key: K, value: V) {
  34. let key = key.as_ref();
  35. let value = value.as_ref();
  36. self.inner
  37. .append(key.to_owned(), value.to_owned(), KVRaftOptions::default())
  38. .await
  39. .expect("Append should never return error with unlimited retry.")
  40. }
  41. }
  42. pub struct AsyncClient {
  43. servers: Vec<Box<dyn RemoteKvraft>>,
  44. last_server_index: AtomicUsize,
  45. unique_id: OnceCell<UniqueIdSequence>,
  46. }
  47. impl AsyncClient {
  48. pub fn new(servers: Vec<impl RemoteKvraft>) -> Self {
  49. let servers = servers
  50. .into_iter()
  51. .map(|s| Box::new(s) as Box<dyn RemoteKvraft>)
  52. .collect();
  53. Self {
  54. servers,
  55. last_server_index: AtomicUsize::new(0),
  56. unique_id: OnceCell::new(),
  57. }
  58. }
  59. async fn next_unique_id(&self) -> UniqueId {
  60. self.unique_id
  61. .get_or_init(|| self.commit_sentinel())
  62. .await
  63. .inc()
  64. }
  65. async fn zero_unique_id(&self) -> UniqueId {
  66. self.unique_id
  67. .get_or_init(|| self.commit_sentinel())
  68. .await
  69. .zero()
  70. }
  71. async fn commit_sentinel(&self) -> UniqueIdSequence {
  72. loop {
  73. let unique_id = UniqueIdSequence::new();
  74. let args = CommitSentinelArgs {
  75. unique_id: unique_id.inc(),
  76. };
  77. let reply: Option<CommitSentinelReply> = self
  78. .retry_rpc(
  79. |remote, args| remote.commit_sentinel(args),
  80. args,
  81. None,
  82. )
  83. .await;
  84. if let Some(reply) = reply {
  85. match reply.result {
  86. Ok(_) => {
  87. break unique_id;
  88. }
  89. Err(KVError::Expired) | Err(KVError::Conflict) => {
  90. // The client ID happens to be re-used. The request does
  91. // not fail as "Duplicate", because another client has
  92. // committed more than just the sentinel.
  93. // Do nothing.
  94. }
  95. Err(e) => {
  96. panic!(
  97. "Unexpected error with indefinite retry: {:?}",
  98. e
  99. );
  100. }
  101. };
  102. };
  103. }
  104. }
  105. pub async fn retry_rpc<'a, Func, Fut, Args, Reply>(
  106. &'a self,
  107. mut future_func: Func,
  108. args: Args,
  109. max_retry: Option<usize>,
  110. ) -> Option<Reply>
  111. where
  112. Args: Clone,
  113. Reply: ValidReply,
  114. Fut: Future<Output = std::io::Result<Reply>> + Send + 'a,
  115. Func: FnMut(&'a dyn RemoteKvraft, Args) -> Fut,
  116. {
  117. let max_retry =
  118. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  119. let mut index = self.last_server_index.load(Ordering::Relaxed);
  120. for _ in 0..max_retry {
  121. let client = &self.servers[index];
  122. let rpc_response = future_func(client.as_ref(), args.clone()).await;
  123. if let Ok(ret) = rpc_response {
  124. if ret.is_reply_valid() {
  125. self.last_server_index.store(index, Ordering::Relaxed);
  126. return Some(ret);
  127. }
  128. }
  129. index += 1;
  130. index %= self.servers.len();
  131. }
  132. None
  133. }
  134. /// This function returns None when
  135. /// 1. No KVServer can be reached, or
  136. /// 2. No KVServer claimed to be the leader, or
  137. /// 3. When the KVServer committed the request but it was not passed
  138. /// back to the clerk. We must retry with a new unique_id.
  139. ///
  140. /// In all 3 cases the request can be retried.
  141. ///
  142. /// This function do not expect a Conflict request with the same unique_id.
  143. pub async fn get(
  144. &self,
  145. key: String,
  146. options: KVRaftOptions,
  147. ) -> Option<Option<String>> {
  148. self.zero_unique_id().await;
  149. let args = GetArgs { key };
  150. let reply: GetReply = self
  151. .retry_rpc(|remote, args| remote.get(args), args, options.max_retry)
  152. .await?;
  153. match reply.result {
  154. Ok(val) => Some(val),
  155. Err(KVError::Expired) => panic!("Get requests do not expire."),
  156. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  157. _ => None,
  158. }
  159. }
  160. /// This function returns None when
  161. /// 1. No KVServer can be reached, or
  162. /// 2. No KVServer claimed to be the leader.
  163. ///
  164. /// Some(()) is returned if the request has been committed previously, under
  165. /// the assumption is that two different requests with the same unique_id
  166. /// must be identical.
  167. ///
  168. /// This function do not expect a Conflict request with the same unique_id.
  169. async fn put_append(
  170. &self,
  171. key: String,
  172. value: String,
  173. op: PutAppendEnum,
  174. options: KVRaftOptions,
  175. ) -> Option<()> {
  176. let unique_id = self.next_unique_id().await;
  177. let args = PutAppendArgs {
  178. key,
  179. value,
  180. op,
  181. unique_id,
  182. };
  183. let reply: PutAppendReply = self
  184. .retry_rpc(
  185. |remote, args| remote.put_append(args),
  186. args,
  187. options.max_retry,
  188. )
  189. .await?;
  190. match reply.result {
  191. Ok(val) => Some(val),
  192. Err(KVError::Expired) => Some(()),
  193. Err(KVError::Conflict) => panic!("We should never see a conflict."),
  194. _ => None,
  195. }
  196. }
  197. pub async fn put(
  198. &self,
  199. key: String,
  200. value: String,
  201. options: KVRaftOptions,
  202. ) -> Option<()> {
  203. self.put_append(key, value, PutAppendEnum::Put, options)
  204. .await
  205. }
  206. pub async fn append(
  207. &self,
  208. key: String,
  209. value: String,
  210. options: KVRaftOptions,
  211. ) -> Option<()> {
  212. self.put_append(key, value, PutAppendEnum::Append, options)
  213. .await
  214. }
  215. }
  216. #[cfg(test)]
  217. mod tests {
  218. use std::collections::hash_map::DefaultHasher;
  219. use std::collections::HashMap;
  220. use std::hash::Hasher;
  221. use std::sync::Arc;
  222. use async_trait::async_trait;
  223. use futures::executor::block_on;
  224. use parking_lot::Mutex;
  225. use super::*;
  226. /// A fake server that is only leader when certain requests are received.
  227. #[derive(Clone)]
  228. struct FakeRemoteKvraft {
  229. peer_size: usize,
  230. id: usize,
  231. data: Arc<Mutex<HashMap<String, String>>>,
  232. }
  233. impl FakeRemoteKvraft {
  234. fn is_leader(&self, key: &str) -> bool {
  235. let mut hasher = DefaultHasher::new();
  236. hasher.write(key.as_bytes());
  237. let hash_code = hasher.finish();
  238. return (hash_code % (self.peer_size as u64)) as usize == self.id;
  239. }
  240. }
  241. #[async_trait]
  242. impl RemoteKvraft for FakeRemoteKvraft {
  243. async fn get(&self, args: GetArgs) -> std::io::Result<GetReply> {
  244. let result = if self.is_leader(&args.key) {
  245. Ok(self.data.lock().get(&args.key).cloned())
  246. } else {
  247. Err(KVError::NotLeader)
  248. };
  249. Ok(GetReply { result })
  250. }
  251. async fn put_append(
  252. &self,
  253. args: PutAppendArgs,
  254. ) -> std::io::Result<PutAppendReply> {
  255. assert_ne!(0, args.key.len());
  256. let result = if self.is_leader(&args.key) {
  257. let mut data = self.data.lock();
  258. match args.op {
  259. PutAppendEnum::Put => data.insert(args.key, args.value),
  260. PutAppendEnum::Append => {
  261. let value = args.value + data.get(&args.key).unwrap();
  262. data.insert(args.key, value)
  263. }
  264. };
  265. Ok(())
  266. } else {
  267. Err(KVError::NotLeader)
  268. };
  269. Ok(PutAppendReply { result })
  270. }
  271. async fn commit_sentinel(
  272. &self,
  273. args: CommitSentinelArgs,
  274. ) -> std::io::Result<CommitSentinelReply> {
  275. let result = if self.is_leader("") {
  276. let mut data = self.data.lock();
  277. assert!(!data.contains_key(""));
  278. let clerk_id = args.unique_id.clerk_id.to_string();
  279. data.insert("".to_owned(), clerk_id);
  280. Ok(())
  281. } else {
  282. Err(KVError::NotLeader)
  283. };
  284. Ok(CommitSentinelReply { result })
  285. }
  286. }
  287. fn create_client() -> AsyncClerk {
  288. let fake_remote_kvraft0 = FakeRemoteKvraft {
  289. peer_size: 5,
  290. id: 0,
  291. data: Arc::new(Mutex::new(HashMap::new())),
  292. };
  293. fake_remote_kvraft0
  294. .data
  295. .lock()
  296. .insert("What clerk?".to_owned(), "async_clerk".to_owned());
  297. let mut fake_remote_kvraft1 = fake_remote_kvraft0.clone();
  298. fake_remote_kvraft1.id = 1;
  299. let mut fake_remote_kvraft2 = fake_remote_kvraft0.clone();
  300. fake_remote_kvraft2.id = 2;
  301. let mut fake_remote_kvraft3 = fake_remote_kvraft0.clone();
  302. fake_remote_kvraft3.id = 3;
  303. let mut fake_remote_kvraft4 = fake_remote_kvraft0.clone();
  304. fake_remote_kvraft4.id = 4;
  305. AsyncClerk::new(vec![
  306. fake_remote_kvraft0,
  307. fake_remote_kvraft1,
  308. fake_remote_kvraft2,
  309. fake_remote_kvraft3,
  310. fake_remote_kvraft4,
  311. ])
  312. }
  313. #[test]
  314. fn test_get_existing_data() {
  315. let clerk = create_client();
  316. let existing_data = block_on(clerk.get("What clerk?"));
  317. assert_eq!(Some("async_clerk".to_owned()), existing_data,);
  318. let client_id = block_on(clerk.get(""));
  319. assert!(client_id.is_some());
  320. }
  321. #[test]
  322. fn test_get_put_append() {
  323. let clerk = create_client();
  324. block_on(clerk.put("Date", "2017-01-01"));
  325. block_on(clerk.put("Balance", "97"));
  326. let date = block_on(clerk.get("Date"));
  327. assert_eq!(Some("2017-01-01".to_owned()), date);
  328. let balance = block_on(clerk.get("Balance"));
  329. assert_eq!(Some("97".to_owned()), balance);
  330. block_on(clerk.append("Balance", "00"));
  331. let balance = block_on(clerk.get("Balance"));
  332. assert_eq!(Some("0097".to_owned()), balance);
  333. block_on(clerk.put("Balance", "10000"));
  334. let balance = block_on(clerk.get("Balance"));
  335. assert_eq!(Some("10000".to_owned()), balance);
  336. }
  337. }