client.rs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. use super::common::{
  2. GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
  4. };
  5. use common::{KVError, ValidReply};
  6. use labrpc::{Client, RequestMessage};
  7. use serde::de::DeserializeOwned;
  8. use serde::Serialize;
  9. use std::sync::atomic::{AtomicUsize, Ordering};
  10. use std::sync::Once;
  11. pub struct Clerk {
  12. init: Once,
  13. pub inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<Client>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get(&mut self, key: String) -> Option<String> {
  23. let (init, inner) = (&self.init, &mut self.inner);
  24. init.call_once(|| inner.commit_sentinel());
  25. inner.get(key, Default::default())
  26. }
  27. pub fn put(&mut self, key: String, value: String) -> Option<()> {
  28. let (init, inner) = (&self.init, &mut self.inner);
  29. init.call_once(|| inner.commit_sentinel());
  30. inner.put(key, value, Default::default())
  31. }
  32. pub fn append(&mut self, key: String, value: String) -> Option<()> {
  33. let (init, inner) = (&self.init, &mut self.inner);
  34. init.call_once(|| inner.commit_sentinel());
  35. inner.append(key, value, Default::default())
  36. }
  37. }
  38. pub struct ClerkInner {
  39. servers: Vec<Client>,
  40. last_server_index: AtomicUsize,
  41. unique_id: UniqueIdSequence,
  42. executor: tokio::runtime::Runtime,
  43. }
  44. impl ClerkInner {
  45. pub fn new(servers: Vec<Client>) -> Self {
  46. Self {
  47. servers,
  48. last_server_index: AtomicUsize::new(0),
  49. unique_id: UniqueIdSequence::new(),
  50. executor: tokio::runtime::Builder::new_multi_thread()
  51. .thread_name("kvraft-clerk")
  52. .worker_threads(1)
  53. .build()
  54. .expect("Creating thread pool should not fail"),
  55. }
  56. }
  57. fn commit_sentinel(&mut self) {
  58. loop {
  59. let args = GetArgs {
  60. key: "".to_string(),
  61. unique_id: self.unique_id.zero(),
  62. };
  63. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  64. if let Some(reply) = reply {
  65. match reply.result {
  66. Ok(_) => {
  67. if !reply.is_retry {
  68. // Discard the used unique_id.
  69. self.unique_id.inc();
  70. break;
  71. } else {
  72. // The RPC was successful, but the server has had an
  73. // exact same entry, which means someone else has taken
  74. // that clerk_id.
  75. self.unique_id = UniqueIdSequence::new();
  76. }
  77. }
  78. Err(KVError::Expired) | Err(KVError::Conflict) => {
  79. // The client ID happens to be re-used. The request does
  80. // not fail as "Duplicate", because another client has
  81. // committed more than just the sentinel.
  82. self.unique_id = UniqueIdSequence::new();
  83. }
  84. Err(_) => {}
  85. };
  86. };
  87. }
  88. }
  89. fn call_rpc<M, A, R>(
  90. &mut self,
  91. method: M,
  92. args: A,
  93. max_retry: Option<usize>,
  94. ) -> Option<R>
  95. where
  96. M: AsRef<str>,
  97. A: Serialize,
  98. R: DeserializeOwned + ValidReply,
  99. {
  100. let method = method.as_ref().to_owned();
  101. let data = RequestMessage::from(
  102. bincode::serialize(&args)
  103. .expect("Serialization of requests should not fail"),
  104. );
  105. let max_retry =
  106. std::cmp::max(max_retry.unwrap_or(usize::MAX), self.servers.len());
  107. let mut index = self.last_server_index.load(Ordering::Relaxed);
  108. for _ in 0..max_retry {
  109. let client = &self.servers[index];
  110. let reply = self
  111. .executor
  112. .block_on(client.call_rpc(method.clone(), data.clone()));
  113. if let Ok(reply) = reply {
  114. let ret: R = bincode::deserialize(reply.as_ref())
  115. .expect("Deserialization of reply should not fail");
  116. if ret.is_reply_valid() {
  117. self.last_server_index.store(index, Ordering::Relaxed);
  118. return Some(ret);
  119. }
  120. }
  121. index += 1;
  122. index %= self.servers.len();
  123. }
  124. None
  125. }
  126. pub fn get(
  127. &mut self,
  128. key: String,
  129. options: KVRaftOptions,
  130. ) -> Option<String> {
  131. let args = GetArgs {
  132. key,
  133. unique_id: self.unique_id.inc(),
  134. };
  135. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  136. match reply.result {
  137. Ok(val) => val,
  138. _ => None,
  139. }
  140. }
  141. fn put_append(
  142. &mut self,
  143. key: String,
  144. value: String,
  145. op: PutAppendEnum,
  146. options: KVRaftOptions,
  147. ) -> Option<()> {
  148. let args = PutAppendArgs {
  149. key,
  150. value,
  151. op,
  152. unique_id: self.unique_id.inc(),
  153. };
  154. let reply: PutAppendReply =
  155. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  156. assert!(reply.result.is_ok());
  157. Some(())
  158. }
  159. pub fn put(
  160. &mut self,
  161. key: String,
  162. value: String,
  163. options: KVRaftOptions,
  164. ) -> Option<()> {
  165. self.put_append(key, value, PutAppendEnum::Put, options)
  166. }
  167. pub fn append(
  168. &mut self,
  169. key: String,
  170. value: String,
  171. options: KVRaftOptions,
  172. ) -> Option<()> {
  173. self.put_append(key, value, PutAppendEnum::Append, options)
  174. }
  175. }