server.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. use super::common::{
  2. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueId,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient};
  7. use std::collections::HashMap;
  8. use std::sync::mpsc::{channel, Receiver};
  9. use std::sync::Arc;
  10. use std::time::Duration;
  11. struct KVServer {
  12. state: Mutex<KVServerState>,
  13. rf: Mutex<Raft<UniqueKVOp>>,
  14. // snapshot
  15. }
  16. type IndexedCommand = (usize, UniqueKVOp);
  17. #[derive(Clone, Default, Serialize, Deserialize)]
  18. struct UniqueKVOp {
  19. op: KVOp,
  20. unique_id: UniqueId,
  21. }
  22. #[derive(Default)]
  23. struct KVServerState {
  24. kv: HashMap<String, String>,
  25. debug_kv: HashMap<String, String>,
  26. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  27. }
  28. #[derive(Clone, Serialize, Deserialize)]
  29. enum KVOp {
  30. NoOp,
  31. Get(GetOp),
  32. Put(PutAppendOp),
  33. Append(PutAppendOp),
  34. }
  35. impl Default for KVOp {
  36. fn default() -> Self {
  37. KVOp::NoOp
  38. }
  39. }
  40. #[derive(Clone, Serialize, Deserialize)]
  41. struct GetOp {
  42. key: String,
  43. }
  44. #[derive(Clone, Serialize, Deserialize)]
  45. struct PutAppendOp {
  46. key: String,
  47. value: String,
  48. }
  49. struct UniqueKVOpStep {
  50. step: KVOpStep,
  51. unique_id: UniqueId,
  52. }
  53. enum KVOpStep {
  54. Unseen,
  55. Pending(Arc<Condvar>),
  56. Done(CommitResult),
  57. }
  58. #[derive(Clone, Debug)]
  59. enum CommitResult {
  60. Get(Option<String>),
  61. Put,
  62. Append,
  63. }
  64. #[derive(Debug)]
  65. enum CommitError {
  66. NotLeader,
  67. Expired(UniqueId),
  68. TimedOut,
  69. Conflict,
  70. Duplicate(CommitResult),
  71. }
  72. impl From<CommitError> for KVError {
  73. fn from(err: CommitError) -> Self {
  74. match err {
  75. CommitError::NotLeader => KVError::NotLeader,
  76. CommitError::Expired(_) => KVError::Expired,
  77. CommitError::TimedOut => KVError::TimedOut,
  78. CommitError::Conflict => KVError::Conflict,
  79. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  80. }
  81. }
  82. }
  83. impl KVServerState {
  84. fn find_op_or_unseen(
  85. &mut self,
  86. unique_id: UniqueId,
  87. ) -> &mut UniqueKVOpStep {
  88. self.applied_op
  89. .entry(unique_id.clerk_id)
  90. .and_modify(|e| {
  91. if e.step == KVOpStep::Unseen {
  92. panic!("Unseen op should never been here.")
  93. }
  94. })
  95. .or_insert_with(|| UniqueKVOpStep {
  96. step: KVOpStep::Unseen,
  97. unique_id,
  98. })
  99. }
  100. }
  101. impl KVServer {
  102. pub fn new(
  103. servers: Vec<RpcClient>,
  104. me: usize,
  105. persister: Arc<dyn Persister>,
  106. ) -> Arc<Self> {
  107. let (tx, rx) = channel();
  108. let apply_command = move |index, command| {
  109. tx.send((index, command))
  110. .expect("The receiving end of apply command channel should have not been dropped");
  111. };
  112. let ret = Arc::new(Self {
  113. state: Default::default(),
  114. rf: Mutex::new(Raft::new(
  115. servers,
  116. me,
  117. persister,
  118. apply_command,
  119. None,
  120. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  121. )),
  122. });
  123. ret.clone().process_command(rx);
  124. ret
  125. }
  126. fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
  127. let mut state = self.state.lock();
  128. let result = match op {
  129. KVOp::NoOp => return,
  130. KVOp::Get(op) => CommitResult::Get(state.kv.get(&op.key).cloned()),
  131. KVOp::Put(op) => {
  132. state.kv.insert(op.key, op.value);
  133. CommitResult::Put
  134. }
  135. KVOp::Append(op) => {
  136. let (key, value) = (op.key, op.value);
  137. state
  138. .kv
  139. .entry(key)
  140. .and_modify(|str| str.push_str(&value))
  141. .or_insert(value);
  142. CommitResult::Append
  143. }
  144. };
  145. let last_result = state.find_op_or_unseen(unique_id);
  146. if unique_id > last_result.unique_id {
  147. last_result.unique_id = unique_id
  148. }
  149. let last_step = std::mem::replace(&mut last_result.step, KVOpStep::Done(result);
  150. if let KVOpStep::Pending(condvar) = last_step {
  151. condvar.notify_all();
  152. }
  153. }
  154. fn process_command(
  155. self: Arc<Self>,
  156. command_channel: Receiver<IndexedCommand>,
  157. ) {
  158. std::thread::spawn(move || {
  159. while let Ok((_, command)) = command_channel.recv() {
  160. self.apply_op(command.unique_id, command.op);
  161. }
  162. });
  163. }
  164. fn block_for_commit(
  165. &self,
  166. unique_id: UniqueId,
  167. op: KVOp,
  168. timeout: Duration,
  169. ) -> Result<CommitResult, CommitError> {
  170. let (unseen, condvar) = {
  171. let mut state = self.state.lock();
  172. let last_result = state.find_op_or_unseen(unique_id);
  173. // We know that the two unique_ids must come from the same clerk,
  174. // because they are found in the same entry of applied_op.
  175. assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
  176. // This is a newer request
  177. if unique_id > last_result.unique_id {
  178. last_result.unique_id = unique_id;
  179. match &last_result.step {
  180. KVOpStep::Unseen => {
  181. panic!("Unseen results should never be seen.")
  182. }
  183. // Notify all threads that are still waiting that a new
  184. // request has arrived. This should never happen.
  185. KVOpStep::Pending(condvar) => {
  186. condvar.notify_all();
  187. }
  188. KVOpStep::Done(_) => {}
  189. }
  190. last_result.step = KVOpStep::Unseen;
  191. }
  192. // Now we know unique_id <= last_result.unique_id.
  193. assert!(unique_id <= last_result.unique_id);
  194. match &last_result.step {
  195. KVOpStep::Unseen => {
  196. let condvar = Arc::new(Condvar::new());
  197. last_result.step = KVOpStep::Pending(condvar.clone());
  198. (true, condvar)
  199. }
  200. // The operation is still pending.
  201. KVOpStep::Pending(condvar) => (false, condvar.clone()),
  202. // The operation is a Get
  203. KVOpStep::Done(CommitResult::Get(value)) => {
  204. return if unique_id == last_result.unique_id {
  205. // This is the same operation as the last one
  206. Ok(CommitResult::Get(value.clone()))
  207. } else {
  208. // A past Get operation is being retried. We do not
  209. // know the proper value to return.
  210. Err(CommitError::Expired(unique_id))
  211. };
  212. }
  213. // For Put & Append operations, all we know is that all past
  214. // operations must have been committed, returning OK.
  215. KVOpStep::Done(result) => return Ok(result.clone()),
  216. }
  217. };
  218. if unseen {
  219. let op = UniqueKVOp { op, unique_id };
  220. if self.rf.lock().start(op).is_none() {
  221. return Err(CommitError::NotLeader);
  222. }
  223. }
  224. let mut state = self.state.lock();
  225. // Wait for the op to be committed.
  226. condvar.wait_for(&mut state, timeout);
  227. let step = state
  228. .applied_op
  229. .get(&unique_id.clerk_id)
  230. .ok_or_else(CommitError::Expired(unique_id))?;
  231. if unique_id != step.unique_id {
  232. // The clerk must have seen the result of this request because they
  233. // are sending in a new one. Just return error.
  234. return Err(CommitError::Expired(unique_id));
  235. }
  236. return if let KVOpStep::Done(result) = &step.step {
  237. if unseen {
  238. Ok(result.clone())
  239. } else {
  240. Err(CommitError::Duplicate(result.clone()))
  241. }
  242. } else {
  243. Err(CommitError::TimedOut)
  244. };
  245. }
  246. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  247. pub fn get(&self, args: GetArgs) -> GetReply {
  248. let (is_retry, result) = match self.block_for_commit(
  249. args.unique_id,
  250. KVOp::Get(GetOp { key: args.key }),
  251. Self::DEFAULT_TIMEOUT,
  252. ) {
  253. Ok(result) => (false, result),
  254. Err(CommitError::Duplicate(result)) => (true, result),
  255. Err(e) => {
  256. return GetReply {
  257. result: Err(e.into()),
  258. is_retry: false,
  259. }
  260. }
  261. };
  262. let result = match result {
  263. CommitResult::Get(result) => Ok(result),
  264. CommitResult::Put => Err(KVError::Conflict),
  265. CommitResult::Append => Err(KVError::Conflict),
  266. };
  267. GetReply { result, is_retry }
  268. }
  269. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  270. let op = PutAppendOp {
  271. key: args.key,
  272. value: args.value,
  273. };
  274. let op = match args.op {
  275. PutAppendEnum::Put => KVOp::Put(op),
  276. PutAppendEnum::Append => KVOp::Append(op),
  277. };
  278. let result = match self.block_for_commit(
  279. args.unique_id,
  280. op,
  281. Self::DEFAULT_TIMEOUT,
  282. ) {
  283. Ok(result) => result,
  284. Err(CommitError::Duplicate(result)) => result,
  285. Err(e) => {
  286. return PutAppendReply {
  287. result: Err(e.into()),
  288. }
  289. }
  290. };
  291. let result = match result {
  292. CommitResult::Put => {
  293. if args.op == PutAppendEnum::Put {
  294. Ok(())
  295. } else {
  296. Err(KVError::Conflict)
  297. }
  298. }
  299. CommitResult::Append => {
  300. if args.op == PutAppendEnum::Append {
  301. Ok(())
  302. } else {
  303. Err(KVError::Conflict)
  304. }
  305. }
  306. CommitResult::Get(_) => Err(KVError::Conflict),
  307. };
  308. PutAppendReply { result }
  309. }
  310. pub fn kill(self) {
  311. self.rf.into_inner().kill()
  312. // The process_command thread will exit, after Raft drops the reference
  313. // to the sender.
  314. }
  315. }