server.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. use super::common::{
  2. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueId,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient};
  7. use std::collections::HashMap;
  8. use std::sync::mpsc::{channel, Receiver};
  9. use std::sync::Arc;
  10. use std::time::Duration;
  11. struct KVServer {
  12. state: Mutex<KVServerState>,
  13. rf: Mutex<Raft<UniqueKVOp>>,
  14. // snapshot
  15. }
  16. type IndexedCommand = (usize, UniqueKVOp);
  17. #[derive(Clone, Default, Serialize, Deserialize)]
  18. struct UniqueKVOp {
  19. op: KVOp,
  20. unique_id: UniqueId,
  21. }
  22. #[derive(Default)]
  23. struct KVServerState {
  24. kv: HashMap<String, String>,
  25. debug_kv: HashMap<String, String>,
  26. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  27. }
  28. #[derive(Clone, Serialize, Deserialize)]
  29. enum KVOp {
  30. NoOp,
  31. Get(GetOp),
  32. Put(PutAppendOp),
  33. Append(PutAppendOp),
  34. }
  35. impl Default for KVOp {
  36. fn default() -> Self {
  37. KVOp::NoOp
  38. }
  39. }
  40. #[derive(Clone, Serialize, Deserialize)]
  41. struct GetOp {
  42. key: String,
  43. }
  44. #[derive(Clone, Serialize, Deserialize)]
  45. struct PutAppendOp {
  46. key: String,
  47. value: String,
  48. }
  49. struct UniqueKVOpStep {
  50. step: KVOpStep,
  51. unique_id: UniqueId,
  52. }
  53. enum KVOpStep {
  54. Unseen,
  55. Pending(Arc<ResultHolder>),
  56. Done(CommitResult),
  57. }
  58. struct ResultHolder {
  59. result: Mutex<Result<CommitResult, CommitError>>,
  60. condvar: Condvar,
  61. }
  62. #[derive(Clone, Debug)]
  63. enum CommitResult {
  64. Get(Option<String>),
  65. Put,
  66. Append,
  67. }
  68. #[derive(Clone, Debug)]
  69. enum CommitError {
  70. NotLeader,
  71. Expired(UniqueId),
  72. TimedOut,
  73. Conflict,
  74. Duplicate(CommitResult),
  75. }
  76. impl From<CommitError> for KVError {
  77. fn from(err: CommitError) -> Self {
  78. match err {
  79. CommitError::NotLeader => KVError::NotLeader,
  80. CommitError::Expired(_) => KVError::Expired,
  81. CommitError::TimedOut => KVError::TimedOut,
  82. CommitError::Conflict => KVError::Conflict,
  83. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  84. }
  85. }
  86. }
  87. impl KVServerState {
  88. fn find_op_or_unseen(
  89. &mut self,
  90. unique_id: UniqueId,
  91. ) -> &mut UniqueKVOpStep {
  92. self.applied_op
  93. .entry(unique_id.clerk_id)
  94. .and_modify(|e| {
  95. if let KVOpStep::Unseen = e.step {
  96. panic!("Unseen op should never been here.")
  97. }
  98. })
  99. .or_insert_with(|| UniqueKVOpStep {
  100. step: KVOpStep::Unseen,
  101. unique_id,
  102. })
  103. }
  104. }
  105. impl KVServer {
  106. pub fn new(
  107. servers: Vec<RpcClient>,
  108. me: usize,
  109. persister: Arc<dyn Persister>,
  110. ) -> Arc<Self> {
  111. let (tx, rx) = channel();
  112. let apply_command = move |index, command| {
  113. tx.send((index, command))
  114. .expect("The receiving end of apply command channel should have not been dropped");
  115. };
  116. let ret = Arc::new(Self {
  117. state: Default::default(),
  118. rf: Mutex::new(Raft::new(
  119. servers,
  120. me,
  121. persister,
  122. apply_command,
  123. None,
  124. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  125. )),
  126. });
  127. ret.clone().process_command(rx);
  128. ret
  129. }
  130. fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
  131. let mut state = self.state.lock();
  132. {
  133. let curr_op = state.find_op_or_unseen(unique_id);
  134. if unique_id < curr_op.unique_id {
  135. // Redelivered.
  136. return;
  137. }
  138. if unique_id == curr_op.unique_id {
  139. if let KVOpStep::Done(_) = curr_op.step {
  140. // Redelivered.
  141. return;
  142. }
  143. }
  144. }
  145. let result = match op {
  146. KVOp::NoOp => return,
  147. KVOp::Get(op) => CommitResult::Get(state.kv.get(&op.key).cloned()),
  148. KVOp::Put(op) => {
  149. state.kv.insert(op.key, op.value);
  150. CommitResult::Put
  151. }
  152. KVOp::Append(op) => {
  153. let (key, value) = (op.key, op.value);
  154. state
  155. .kv
  156. .entry(key)
  157. .and_modify(|str| str.push_str(&value))
  158. .or_insert(value);
  159. CommitResult::Append
  160. }
  161. };
  162. let last_op = std::mem::replace(
  163. state.applied_op.get_mut(&unique_id.clerk_id).expect(""),
  164. UniqueKVOpStep {
  165. step: KVOpStep::Done(result.clone()),
  166. unique_id,
  167. },
  168. );
  169. assert!(unique_id >= last_op.unique_id);
  170. if let KVOpStep::Pending(result_holder) = last_op.step {
  171. *result_holder.result.lock() = if unique_id == last_op.unique_id {
  172. Ok(result)
  173. } else {
  174. Err(CommitError::Expired(last_op.unique_id))
  175. };
  176. result_holder.condvar.notify_all();
  177. }
  178. }
  179. fn process_command(
  180. self: Arc<Self>,
  181. command_channel: Receiver<IndexedCommand>,
  182. ) {
  183. std::thread::spawn(move || {
  184. while let Ok((_, command)) = command_channel.recv() {
  185. self.apply_op(command.unique_id, command.op);
  186. }
  187. });
  188. }
  189. fn block_for_commit(
  190. &self,
  191. unique_id: UniqueId,
  192. op: KVOp,
  193. timeout: Duration,
  194. ) -> Result<CommitResult, CommitError> {
  195. let (unseen, result_holder) = {
  196. let mut state = self.state.lock();
  197. let last_result = state.find_op_or_unseen(unique_id);
  198. // We know that the two unique_ids must come from the same clerk,
  199. // because they are found in the same entry of applied_op.
  200. assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
  201. // This is a newer request
  202. if unique_id > last_result.unique_id {
  203. last_result.unique_id = unique_id;
  204. match &last_result.step {
  205. KVOpStep::Unseen => {
  206. panic!("Unseen results should never be seen.")
  207. }
  208. // Notify all threads that are still waiting that a new
  209. // request has arrived. This should never happen.
  210. KVOpStep::Pending(result_holder) => {
  211. result_holder.condvar.notify_all();
  212. }
  213. KVOpStep::Done(_) => {}
  214. }
  215. last_result.step = KVOpStep::Unseen;
  216. }
  217. // Now we know unique_id <= last_result.unique_id.
  218. assert!(unique_id <= last_result.unique_id);
  219. match &last_result.step {
  220. KVOpStep::Unseen => {
  221. let result_holder = Arc::new(ResultHolder {
  222. // The default error is timed-out, if no one touches the
  223. // result holder at all.
  224. result: Mutex::new(Err(CommitError::TimedOut)),
  225. condvar: Condvar::new(),
  226. });
  227. last_result.step = KVOpStep::Pending(result_holder.clone());
  228. (true, result_holder)
  229. }
  230. // The operation is still pending.
  231. KVOpStep::Pending(result_holder) => {
  232. (false, result_holder.clone())
  233. }
  234. // The operation is a Get
  235. KVOpStep::Done(CommitResult::Get(value)) => {
  236. return if unique_id == last_result.unique_id {
  237. // This is the same operation as the last one
  238. Ok(CommitResult::Get(value.clone()))
  239. } else {
  240. // A past Get operation is being retried. We do not
  241. // know the proper value to return.
  242. Err(CommitError::Expired(unique_id))
  243. };
  244. }
  245. // For Put & Append operations, all we know is that all past
  246. // operations must have been committed, returning OK.
  247. KVOpStep::Done(result) => return Ok(result.clone()),
  248. }
  249. };
  250. if unseen {
  251. let op = UniqueKVOp { op, unique_id };
  252. if self.rf.lock().start(op).is_none() {
  253. return Err(CommitError::NotLeader);
  254. }
  255. }
  256. let mut result = result_holder.result.lock();
  257. // Wait for the op to be committed.
  258. result_holder.condvar.wait_for(&mut result, timeout);
  259. let result = result.clone();
  260. return if let Ok(result) = result {
  261. if unseen {
  262. Ok(result)
  263. } else {
  264. Err(CommitError::Duplicate(result))
  265. }
  266. } else {
  267. result
  268. };
  269. }
  270. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  271. pub fn get(&self, args: GetArgs) -> GetReply {
  272. let (is_retry, result) = match self.block_for_commit(
  273. args.unique_id,
  274. KVOp::Get(GetOp { key: args.key }),
  275. Self::DEFAULT_TIMEOUT,
  276. ) {
  277. Ok(result) => (false, result),
  278. Err(CommitError::Duplicate(result)) => (true, result),
  279. Err(e) => {
  280. return GetReply {
  281. result: Err(e.into()),
  282. is_retry: false,
  283. }
  284. }
  285. };
  286. let result = match result {
  287. CommitResult::Get(result) => Ok(result),
  288. CommitResult::Put => Err(KVError::Conflict),
  289. CommitResult::Append => Err(KVError::Conflict),
  290. };
  291. GetReply { result, is_retry }
  292. }
  293. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  294. let op = PutAppendOp {
  295. key: args.key,
  296. value: args.value,
  297. };
  298. let op = match args.op {
  299. PutAppendEnum::Put => KVOp::Put(op),
  300. PutAppendEnum::Append => KVOp::Append(op),
  301. };
  302. let result = match self.block_for_commit(
  303. args.unique_id,
  304. op,
  305. Self::DEFAULT_TIMEOUT,
  306. ) {
  307. Ok(result) => result,
  308. Err(CommitError::Duplicate(result)) => result,
  309. Err(e) => {
  310. return PutAppendReply {
  311. result: Err(e.into()),
  312. }
  313. }
  314. };
  315. let result = match result {
  316. CommitResult::Put => {
  317. if args.op == PutAppendEnum::Put {
  318. Ok(())
  319. } else {
  320. Err(KVError::Conflict)
  321. }
  322. }
  323. CommitResult::Append => {
  324. if args.op == PutAppendEnum::Append {
  325. Ok(())
  326. } else {
  327. Err(KVError::Conflict)
  328. }
  329. }
  330. CommitResult::Get(_) => Err(KVError::Conflict),
  331. };
  332. PutAppendReply { result }
  333. }
  334. pub fn kill(self) {
  335. self.rf.into_inner().kill()
  336. // The process_command thread will exit, after Raft drops the reference
  337. // to the sender.
  338. }
  339. }