server.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. use super::common::{
  2. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueId,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient};
  7. use std::collections::HashMap;
  8. use std::sync::mpsc::{channel, Receiver};
  9. use std::sync::Arc;
  10. use std::time::Duration;
  11. struct KVServer {
  12. state: Mutex<KVServerState>,
  13. rf: Mutex<Raft<UniqueKVOp>>,
  14. // snapshot
  15. }
  16. type IndexedCommand = (usize, UniqueKVOp);
  17. #[derive(Clone, Default, Serialize, Deserialize)]
  18. struct UniqueKVOp {
  19. op: KVOp,
  20. unique_id: UniqueId,
  21. }
  22. #[derive(Default)]
  23. struct KVServerState {
  24. kv: HashMap<String, String>,
  25. debug_kv: HashMap<String, String>,
  26. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  27. }
  28. #[derive(Clone, Serialize, Deserialize)]
  29. enum KVOp {
  30. NoOp,
  31. Get(String),
  32. Put(String, String),
  33. Append(String, String),
  34. }
  35. impl Default for KVOp {
  36. fn default() -> Self {
  37. KVOp::NoOp
  38. }
  39. }
  40. struct UniqueKVOpStep {
  41. step: KVOpStep,
  42. unique_id: UniqueId,
  43. }
  44. enum KVOpStep {
  45. Unseen,
  46. Pending(Arc<ResultHolder>),
  47. Done(CommitResult),
  48. }
  49. struct ResultHolder {
  50. result: Mutex<Result<CommitResult, CommitError>>,
  51. condvar: Condvar,
  52. }
  53. #[derive(Clone, Debug)]
  54. enum CommitResult {
  55. Get(Option<String>),
  56. Put,
  57. Append,
  58. }
  59. #[derive(Clone, Debug)]
  60. enum CommitError {
  61. NotLeader,
  62. Expired(UniqueId),
  63. TimedOut,
  64. Conflict,
  65. Duplicate(CommitResult),
  66. }
  67. impl From<CommitError> for KVError {
  68. fn from(err: CommitError) -> Self {
  69. match err {
  70. CommitError::NotLeader => KVError::NotLeader,
  71. CommitError::Expired(_) => KVError::Expired,
  72. CommitError::TimedOut => KVError::TimedOut,
  73. CommitError::Conflict => KVError::Conflict,
  74. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  75. }
  76. }
  77. }
  78. impl KVServer {
  79. pub fn new(
  80. servers: Vec<RpcClient>,
  81. me: usize,
  82. persister: Arc<dyn Persister>,
  83. ) -> Arc<Self> {
  84. let (tx, rx) = channel();
  85. let apply_command = move |index, command| {
  86. tx.send((index, command))
  87. .expect("The receiving end of apply command channel should have not been dropped");
  88. };
  89. let ret = Arc::new(Self {
  90. state: Default::default(),
  91. rf: Mutex::new(Raft::new(
  92. servers,
  93. me,
  94. persister,
  95. apply_command,
  96. None,
  97. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  98. )),
  99. });
  100. ret.clone().process_command(rx);
  101. ret
  102. }
  103. fn find_op_or_unseen(
  104. applied_op: &mut HashMap<ClerkId, UniqueKVOpStep>,
  105. unique_id: UniqueId,
  106. ) -> &mut UniqueKVOpStep {
  107. let ret = applied_op
  108. .entry(unique_id.clerk_id)
  109. .and_modify(|e| {
  110. if let KVOpStep::Unseen = e.step {
  111. panic!("Unseen op should never been here.")
  112. }
  113. })
  114. .or_insert_with(|| UniqueKVOpStep {
  115. step: KVOpStep::Unseen,
  116. unique_id,
  117. });
  118. // We know that the two unique_ids must come from the same clerk,
  119. // because they are found in the same entry of applied_op.
  120. assert_eq!(unique_id.clerk_id, ret.unique_id.clerk_id);
  121. ret
  122. }
  123. fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
  124. // The borrow checker does not allow borrowing two fields of an instance
  125. // inside a MutexGuard. But it does allow borrowing two fields of the
  126. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  127. let state = &mut *self.state.lock();
  128. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  129. let curr_op = Self::find_op_or_unseen(applied_op, unique_id);
  130. if unique_id < curr_op.unique_id {
  131. // Redelivered.
  132. return;
  133. }
  134. if unique_id == curr_op.unique_id {
  135. if let KVOpStep::Done(_) = curr_op.step {
  136. // Redelivered.
  137. return;
  138. }
  139. }
  140. assert!(unique_id >= curr_op.unique_id);
  141. let result = match op {
  142. KVOp::NoOp => return,
  143. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  144. KVOp::Put(key, value) => {
  145. kv.insert(key, value);
  146. CommitResult::Put
  147. }
  148. KVOp::Append(key, value) => {
  149. kv.entry(key)
  150. .and_modify(|str| str.push_str(&value))
  151. .or_insert(value);
  152. CommitResult::Append
  153. }
  154. };
  155. let last_op = std::mem::replace(
  156. curr_op,
  157. UniqueKVOpStep {
  158. step: KVOpStep::Done(result.clone()),
  159. unique_id,
  160. },
  161. );
  162. assert!(unique_id >= last_op.unique_id);
  163. if let KVOpStep::Pending(result_holder) = last_op.step {
  164. *result_holder.result.lock() = if unique_id == last_op.unique_id {
  165. Ok(result)
  166. } else {
  167. Err(CommitError::Expired(last_op.unique_id))
  168. };
  169. result_holder.condvar.notify_all();
  170. }
  171. }
  172. fn process_command(
  173. self: Arc<Self>,
  174. command_channel: Receiver<IndexedCommand>,
  175. ) {
  176. std::thread::spawn(move || {
  177. while let Ok((_, command)) = command_channel.recv() {
  178. self.apply_op(command.unique_id, command.op);
  179. }
  180. });
  181. }
  182. fn block_for_commit(
  183. &self,
  184. unique_id: UniqueId,
  185. op: KVOp,
  186. timeout: Duration,
  187. ) -> Result<CommitResult, CommitError> {
  188. let (unseen, result_holder) = {
  189. let mut state = self.state.lock();
  190. let curr_op =
  191. Self::find_op_or_unseen(&mut state.applied_op, unique_id);
  192. // This is a newer request
  193. if unique_id > curr_op.unique_id {
  194. let last_op = std::mem::replace(
  195. curr_op,
  196. UniqueKVOpStep {
  197. step: KVOpStep::Unseen,
  198. unique_id,
  199. },
  200. );
  201. match last_op.step {
  202. KVOpStep::Unseen => {
  203. panic!("Unseen results should never be seen.")
  204. }
  205. // Notify all threads that are still waiting that a new
  206. // request has arrived. This should never happen.
  207. KVOpStep::Pending(result_holder) => {
  208. *result_holder.result.lock() =
  209. Err(CommitError::Expired(last_op.unique_id));
  210. result_holder.condvar.notify_all();
  211. }
  212. KVOpStep::Done(_) => {}
  213. }
  214. }
  215. // Now we know unique_id <= curr_op.unique_id.
  216. assert!(unique_id <= curr_op.unique_id);
  217. match &curr_op.step {
  218. KVOpStep::Unseen => {
  219. let result_holder = Arc::new(ResultHolder {
  220. // The default error is timed-out, if no one touches the
  221. // result holder at all.
  222. result: Mutex::new(Err(CommitError::TimedOut)),
  223. condvar: Condvar::new(),
  224. });
  225. curr_op.step = KVOpStep::Pending(result_holder.clone());
  226. (true, result_holder)
  227. }
  228. // The operation is still pending.
  229. KVOpStep::Pending(result_holder) => {
  230. (false, result_holder.clone())
  231. }
  232. // The operation is a Get
  233. KVOpStep::Done(CommitResult::Get(value)) => {
  234. return if unique_id == curr_op.unique_id {
  235. // This is the same operation as the last one
  236. Ok(CommitResult::Get(value.clone()))
  237. } else {
  238. // A past Get operation is being retried. We do not
  239. // know the proper value to return.
  240. Err(CommitError::Expired(unique_id))
  241. };
  242. }
  243. // For Put & Append operations, all we know is that all past
  244. // operations must have been committed, returning OK.
  245. KVOpStep::Done(result) => return Ok(result.clone()),
  246. }
  247. };
  248. if unseen {
  249. let op = UniqueKVOp { op, unique_id };
  250. if self.rf.lock().start(op).is_none() {
  251. return Err(CommitError::NotLeader);
  252. }
  253. }
  254. let mut result = result_holder.result.lock();
  255. // Wait for the op to be committed.
  256. result_holder.condvar.wait_for(&mut result, timeout);
  257. let result = result.clone();
  258. return if let Ok(result) = result {
  259. if unseen {
  260. Ok(result)
  261. } else {
  262. Err(CommitError::Duplicate(result))
  263. }
  264. } else {
  265. result
  266. };
  267. }
  268. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  269. pub fn get(&self, args: GetArgs) -> GetReply {
  270. let (is_retry, result) = match self.block_for_commit(
  271. args.unique_id,
  272. KVOp::Get(args.key),
  273. Self::DEFAULT_TIMEOUT,
  274. ) {
  275. Ok(result) => (false, result),
  276. Err(CommitError::Duplicate(result)) => (true, result),
  277. Err(e) => {
  278. return GetReply {
  279. result: Err(e.into()),
  280. is_retry: false,
  281. }
  282. }
  283. };
  284. let result = match result {
  285. CommitResult::Get(result) => Ok(result),
  286. CommitResult::Put => Err(KVError::Conflict),
  287. CommitResult::Append => Err(KVError::Conflict),
  288. };
  289. GetReply { result, is_retry }
  290. }
  291. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  292. let op = match args.op {
  293. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  294. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  295. };
  296. let result = match self.block_for_commit(
  297. args.unique_id,
  298. op,
  299. Self::DEFAULT_TIMEOUT,
  300. ) {
  301. Ok(result) => result,
  302. Err(CommitError::Duplicate(result)) => result,
  303. Err(e) => {
  304. return PutAppendReply {
  305. result: Err(e.into()),
  306. }
  307. }
  308. };
  309. let result = match result {
  310. CommitResult::Put => {
  311. if args.op == PutAppendEnum::Put {
  312. Ok(())
  313. } else {
  314. Err(KVError::Conflict)
  315. }
  316. }
  317. CommitResult::Append => {
  318. if args.op == PutAppendEnum::Append {
  319. Ok(())
  320. } else {
  321. Err(KVError::Conflict)
  322. }
  323. }
  324. CommitResult::Get(_) => Err(KVError::Conflict),
  325. };
  326. PutAppendReply { result }
  327. }
  328. pub fn kill(self) {
  329. self.rf.into_inner().kill()
  330. // The process_command thread will exit, after Raft drops the reference
  331. // to the sender.
  332. }
  333. }