server.rs 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. use super::common::{
  2. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueId,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient};
  7. use std::collections::hash_map::Entry;
  8. use std::collections::HashMap;
  9. use std::sync::mpsc::{channel, Receiver};
  10. use std::sync::Arc;
  11. use std::time::Duration;
  12. pub struct KVServer {
  13. state: Mutex<KVServerState>,
  14. rf: Mutex<Raft<UniqueKVOp>>,
  15. // snapshot
  16. }
  17. type IndexedCommand = (usize, UniqueKVOp);
  18. #[derive(Clone, Default, Serialize, Deserialize)]
  19. pub struct UniqueKVOp {
  20. op: KVOp,
  21. unique_id: UniqueId,
  22. }
  23. #[derive(Default)]
  24. struct KVServerState {
  25. me: usize,
  26. kv: HashMap<String, String>,
  27. debug_kv: HashMap<String, String>,
  28. applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
  29. queries: HashMap<UniqueId, Arc<ResultHolder>>,
  30. }
  31. #[derive(Clone, Serialize, Deserialize)]
  32. enum KVOp {
  33. NoOp,
  34. Get(String),
  35. Put(String, String),
  36. Append(String, String),
  37. }
  38. impl Default for KVOp {
  39. fn default() -> Self {
  40. KVOp::NoOp
  41. }
  42. }
  43. struct ResultHolder {
  44. result: Mutex<Result<CommitResult, CommitError>>,
  45. condvar: Condvar,
  46. }
  47. #[derive(Clone, Debug)]
  48. enum CommitResult {
  49. Get(Option<String>),
  50. Put,
  51. Append,
  52. }
  53. #[derive(Clone, Debug)]
  54. enum CommitError {
  55. NotLeader,
  56. Expired(UniqueId),
  57. TimedOut,
  58. Conflict,
  59. Duplicate(CommitResult),
  60. }
  61. impl From<CommitError> for KVError {
  62. fn from(err: CommitError) -> Self {
  63. match err {
  64. CommitError::NotLeader => KVError::NotLeader,
  65. CommitError::Expired(_) => KVError::Expired,
  66. CommitError::TimedOut => KVError::TimedOut,
  67. CommitError::Conflict => KVError::Conflict,
  68. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  69. }
  70. }
  71. }
  72. impl KVServer {
  73. pub fn new(
  74. servers: Vec<RpcClient>,
  75. me: usize,
  76. persister: Arc<dyn Persister>,
  77. ) -> Arc<Self> {
  78. let (tx, rx) = channel();
  79. let apply_command = move |index, command| {
  80. tx.send((index, command))
  81. .expect("The receiving end of apply command channel should have not been dropped");
  82. };
  83. let ret = Arc::new(Self {
  84. state: Mutex::new(KVServerState {
  85. me,
  86. ..Default::default()
  87. }),
  88. rf: Mutex::new(Raft::new(
  89. servers,
  90. me,
  91. persister,
  92. apply_command,
  93. None,
  94. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  95. )),
  96. });
  97. ret.clone().process_command(rx);
  98. ret
  99. }
  100. fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
  101. // The borrow checker does not allow borrowing two fields of an instance
  102. // inside a MutexGuard. But it does allow borrowing two fields of the
  103. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  104. let state = &mut *self.state.lock();
  105. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  106. let entry = applied_op.entry(unique_id.clerk_id);
  107. if let Entry::Occupied(curr) = &entry {
  108. let (applied_unique_id, _) = curr.get();
  109. if *applied_unique_id >= unique_id {
  110. // Redelivered.
  111. return;
  112. }
  113. }
  114. let result = match op {
  115. KVOp::NoOp => return,
  116. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  117. KVOp::Put(key, value) => {
  118. kv.insert(key, value);
  119. CommitResult::Put
  120. }
  121. KVOp::Append(key, value) => {
  122. kv.entry(key)
  123. .and_modify(|str| str.push_str(&value))
  124. .or_insert(value);
  125. CommitResult::Append
  126. }
  127. };
  128. match entry {
  129. Entry::Occupied(mut curr) => {
  130. curr.insert((unique_id, result.clone()));
  131. }
  132. Entry::Vacant(vacant) => {
  133. vacant.insert((unique_id, result.clone()));
  134. }
  135. }
  136. if let Some(result_holder) = state.queries.remove(&unique_id) {
  137. *result_holder.result.lock() = Ok(result);
  138. result_holder.condvar.notify_all();
  139. };
  140. }
  141. fn process_command(
  142. self: Arc<Self>,
  143. command_channel: Receiver<IndexedCommand>,
  144. ) {
  145. std::thread::spawn(move || {
  146. while let Ok((_, command)) = command_channel.recv() {
  147. self.apply_op(command.unique_id, command.op);
  148. }
  149. });
  150. }
  151. fn block_for_commit(
  152. &self,
  153. unique_id: UniqueId,
  154. op: KVOp,
  155. timeout: Duration,
  156. ) -> Result<CommitResult, CommitError> {
  157. let result_holder = {
  158. let mut state = self.state.lock();
  159. let applied = state.applied_op.get(&unique_id.clerk_id);
  160. if let Some((applied_unique_id, result)) = applied {
  161. if unique_id < *applied_unique_id {
  162. return Err(CommitError::Expired(unique_id));
  163. } else if unique_id == *applied_unique_id {
  164. return Err(CommitError::Duplicate(result.clone()));
  165. }
  166. };
  167. let entry = state.queries.entry(unique_id).or_insert_with(|| {
  168. Arc::new(ResultHolder {
  169. result: Mutex::new(Err(CommitError::TimedOut)),
  170. condvar: Condvar::new(),
  171. })
  172. });
  173. entry.clone()
  174. };
  175. let op = UniqueKVOp { op, unique_id };
  176. if self.rf.lock().start(op).is_none() {
  177. return Err(CommitError::NotLeader);
  178. }
  179. let mut guard = result_holder.result.lock();
  180. // Wait for the op to be committed.
  181. result_holder.condvar.wait_for(&mut guard, timeout);
  182. // Copy the result out.
  183. let result = guard.clone();
  184. // If the result is OK, all other requests should see "Duplicate".
  185. if let Ok(result) = guard.clone() {
  186. *guard = Err(CommitError::Duplicate(result))
  187. }
  188. return result;
  189. }
  190. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  191. pub fn get(&self, args: GetArgs) -> GetReply {
  192. let (is_retry, result) = match self.block_for_commit(
  193. args.unique_id,
  194. KVOp::Get(args.key),
  195. Self::DEFAULT_TIMEOUT,
  196. ) {
  197. Ok(result) => (false, result),
  198. Err(CommitError::Duplicate(result)) => (true, result),
  199. Err(e) => {
  200. return GetReply {
  201. result: Err(e.into()),
  202. is_retry: false,
  203. }
  204. }
  205. };
  206. let result = match result {
  207. CommitResult::Get(result) => Ok(result),
  208. CommitResult::Put => Err(KVError::Conflict),
  209. CommitResult::Append => Err(KVError::Conflict),
  210. };
  211. GetReply { result, is_retry }
  212. }
  213. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  214. let op = match args.op {
  215. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  216. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  217. };
  218. let result = match self.block_for_commit(
  219. args.unique_id,
  220. op,
  221. Self::DEFAULT_TIMEOUT,
  222. ) {
  223. Ok(result) => result,
  224. Err(CommitError::Duplicate(result)) => result,
  225. Err(e) => {
  226. return PutAppendReply {
  227. result: Err(e.into()),
  228. }
  229. }
  230. };
  231. let result = match result {
  232. CommitResult::Put => {
  233. if args.op == PutAppendEnum::Put {
  234. Ok(())
  235. } else {
  236. Err(KVError::Conflict)
  237. }
  238. }
  239. CommitResult::Append => {
  240. if args.op == PutAppendEnum::Append {
  241. Ok(())
  242. } else {
  243. Err(KVError::Conflict)
  244. }
  245. }
  246. CommitResult::Get(_) => Err(KVError::Conflict),
  247. };
  248. PutAppendReply { result }
  249. }
  250. pub fn raft(&self) -> Raft<UniqueKVOp> {
  251. self.rf.lock().clone()
  252. }
  253. pub fn kill(self) {
  254. self.rf.into_inner().kill()
  255. // The process_command thread will exit, after Raft drops the reference
  256. // to the sender.
  257. }
  258. }