client.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. use super::common::{
  2. GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendReply,
  3. UniqueIdSequence, GET, PUT_APPEND,
  4. };
  5. use crate::kvraft::common::PutAppendEnum;
  6. use labrpc::{Client, RequestMessage};
  7. use serde::de::DeserializeOwned;
  8. use serde::Serialize;
  9. use std::sync::atomic::{AtomicUsize, Ordering};
  10. use std::sync::Once;
  11. pub struct Clerk {
  12. init: Once,
  13. pub inner: ClerkInner,
  14. }
  15. impl Clerk {
  16. pub fn new(servers: Vec<Client>) -> Self {
  17. Self {
  18. init: Once::new(),
  19. inner: ClerkInner::new(servers),
  20. }
  21. }
  22. pub fn get(&mut self, key: String) -> Option<String> {
  23. let (init, inner) = (&self.init, &mut self.inner);
  24. init.call_once(|| inner.commit_sentinel());
  25. inner.get(key, Default::default())
  26. }
  27. pub fn put(
  28. &mut self,
  29. key: String,
  30. value: String,
  31. options: KVRaftOptions,
  32. ) -> Option<()> {
  33. let (init, inner) = (&self.init, &mut self.inner);
  34. init.call_once(|| inner.commit_sentinel());
  35. inner.put(key, value, options)
  36. }
  37. pub fn append(
  38. &mut self,
  39. key: String,
  40. value: String,
  41. options: KVRaftOptions,
  42. ) -> Option<()> {
  43. let (init, inner) = (&self.init, &mut self.inner);
  44. init.call_once(|| inner.commit_sentinel());
  45. inner.append(key, value, options)
  46. }
  47. }
  48. pub struct ClerkInner {
  49. servers: Vec<Client>,
  50. last_server_index: AtomicUsize,
  51. unique_id: UniqueIdSequence,
  52. executor: tokio::runtime::Runtime,
  53. }
  54. impl ClerkInner {
  55. pub fn new(servers: Vec<Client>) -> Self {
  56. Self {
  57. servers,
  58. last_server_index: AtomicUsize::new(0),
  59. unique_id: UniqueIdSequence::new(),
  60. executor: tokio::runtime::Builder::new_multi_thread()
  61. .thread_name("kvraft-clerk")
  62. .worker_threads(1)
  63. .build()
  64. .expect("Creating thread pool should not fail"),
  65. }
  66. }
  67. fn commit_sentinel(&mut self) {
  68. loop {
  69. let args = GetArgs {
  70. key: "".to_string(),
  71. unique_id: self.unique_id.zero(),
  72. };
  73. let reply: Option<GetReply> = self.call_rpc(GET, args, Some(1));
  74. if let Some(reply) = reply {
  75. match reply.result {
  76. Ok(_) => {
  77. if !reply.is_retry {
  78. // Discard the used unique_id.
  79. self.unique_id.inc();
  80. break;
  81. } else {
  82. // The RPC was successful, but the server has had an
  83. // exact same entry, which means someone else has taken
  84. // that clerk_id.
  85. self.unique_id = UniqueIdSequence::new();
  86. }
  87. }
  88. Err(_) => {}
  89. };
  90. };
  91. }
  92. }
  93. fn call_rpc<M: AsRef<str>, A: Clone + Serialize, R: DeserializeOwned>(
  94. &mut self,
  95. method: M,
  96. args: A,
  97. max_retry: Option<usize>,
  98. ) -> Option<R> {
  99. let method = method.as_ref().to_owned();
  100. let data = RequestMessage::from(
  101. bincode::serialize(&args)
  102. .expect("Serialization of requests should not fail"),
  103. );
  104. for _ in 0..max_retry.unwrap_or(usize::MAX) {
  105. let index = self.last_server_index.load(Ordering::Relaxed);
  106. let client = &self.servers[index];
  107. let reply = self
  108. .executor
  109. .block_on(client.call_rpc(method.clone(), data.clone()));
  110. if let Ok(reply) = reply {
  111. let ret = bincode::deserialize(reply.as_ref())
  112. .expect("Deserialization of reply should not fail");
  113. self.last_server_index.store(index, Ordering::Relaxed);
  114. return Some(ret);
  115. }
  116. }
  117. None
  118. }
  119. pub fn get(
  120. &mut self,
  121. key: String,
  122. options: KVRaftOptions,
  123. ) -> Option<String> {
  124. let args = GetArgs {
  125. key,
  126. unique_id: self.unique_id.inc(),
  127. };
  128. let reply: GetReply = self.call_rpc(GET, args, options.max_retry)?;
  129. match reply.result {
  130. Ok(val) => val,
  131. _ => None,
  132. }
  133. }
  134. fn put_append(
  135. &mut self,
  136. key: String,
  137. value: String,
  138. op: PutAppendEnum,
  139. options: KVRaftOptions,
  140. ) -> Option<()> {
  141. let args = PutAppendArgs {
  142. key,
  143. value,
  144. op,
  145. unique_id: self.unique_id.inc(),
  146. };
  147. let reply: PutAppendReply =
  148. self.call_rpc(PUT_APPEND, args, options.max_retry)?;
  149. assert!(reply.result.is_ok());
  150. Some(())
  151. }
  152. pub fn put(
  153. &mut self,
  154. key: String,
  155. value: String,
  156. options: KVRaftOptions,
  157. ) -> Option<()> {
  158. self.put_append(key, value, PutAppendEnum::Put, options)
  159. }
  160. pub fn append(
  161. &mut self,
  162. key: String,
  163. value: String,
  164. options: KVRaftOptions,
  165. ) -> Option<()> {
  166. self.put_append(key, value, PutAppendEnum::Append, options)
  167. }
  168. }