server.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. use super::common::{ClerkId, UniqueId};
  2. use crate::kvraft::common::{
  3. GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum, PutAppendReply,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient};
  7. use std::collections::HashMap;
  8. use std::sync::atomic::{AtomicBool, Ordering};
  9. use std::sync::mpsc::{channel, Receiver};
  10. use std::sync::Arc;
  11. use std::time::Duration;
  12. struct KVServer {
  13. state: Mutex<KVServerState>,
  14. rf: Raft<UniqueKVOp>,
  15. command_channel: Receiver<(usize, UniqueKVOp)>,
  16. shutdown: AtomicBool,
  17. // snapshot
  18. }
  19. #[derive(Clone, Default, Serialize, Deserialize)]
  20. struct UniqueKVOp {
  21. op: KVOp,
  22. unique_id: UniqueId,
  23. }
  24. #[derive(Default)]
  25. struct KVServerState {
  26. kv: HashMap<String, String>,
  27. debug_kv: HashMap<String, String>,
  28. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  29. }
  30. #[derive(Clone, Serialize, Deserialize)]
  31. enum KVOp {
  32. NoOp,
  33. Get(GetOp),
  34. Put(PutAppendOp),
  35. Append(PutAppendOp),
  36. }
  37. impl Default for KVOp {
  38. fn default() -> Self {
  39. KVOp::NoOp
  40. }
  41. }
  42. #[derive(Clone, Serialize, Deserialize)]
  43. struct GetOp {
  44. key: String,
  45. }
  46. #[derive(Clone, Serialize, Deserialize)]
  47. struct PutAppendOp {
  48. key: String,
  49. value: String,
  50. }
  51. struct UniqueKVOpStep {
  52. step: KVOpStep,
  53. unique_id: UniqueId,
  54. }
  55. enum KVOpStep {
  56. Unseen,
  57. Pending(Arc<Condvar>),
  58. Done(CommitResult),
  59. }
  60. #[derive(Clone, Debug)]
  61. enum CommitResult {
  62. Get(Option<String>),
  63. Put,
  64. Append,
  65. }
  66. #[derive(Debug)]
  67. enum CommitError {
  68. NotLeader,
  69. Expired(UniqueId),
  70. TimedOut,
  71. Conflict,
  72. Duplicate(CommitResult),
  73. }
  74. impl From<CommitError> for KVError {
  75. fn from(err: CommitError) -> Self {
  76. match err {
  77. CommitError::NotLeader => KVError::NotLeader,
  78. CommitError::Expired(_) => KVError::Expired,
  79. CommitError::TimedOut => KVError::TimedOut,
  80. CommitError::Conflict => KVError::Conflict,
  81. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  82. }
  83. }
  84. }
  85. impl KVServer {
  86. pub fn new(
  87. servers: Vec<RpcClient>,
  88. me: usize,
  89. persister: Arc<dyn Persister>,
  90. ) -> Self {
  91. let (tx, rx) = channel();
  92. let apply_command = move |index, command| {
  93. tx.send((index, command))
  94. .expect("The receiving end of apply command channel should have not been dropped");
  95. };
  96. Self {
  97. state: Default::default(),
  98. rf: Raft::new(
  99. servers,
  100. me,
  101. persister,
  102. apply_command,
  103. None,
  104. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  105. ),
  106. command_channel: rx,
  107. shutdown: AtomicBool::new(false),
  108. }
  109. }
  110. fn block_for_commit(
  111. &self,
  112. unique_id: UniqueId,
  113. op: KVOp,
  114. timeout: Duration,
  115. ) -> Result<CommitResult, CommitError> {
  116. let (unseen, condvar) = {
  117. let mut state = self.state.lock();
  118. let last_result = state
  119. .applied_op
  120. .entry(unique_id.clerk_id)
  121. .or_insert_with(|| UniqueKVOpStep {
  122. step: KVOpStep::Unseen,
  123. unique_id,
  124. });
  125. // We know that the two unique_ids must come from the same clerk,
  126. // because they are found in the same entry of applied_op.
  127. assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
  128. // This is a newer request
  129. if unique_id > last_result.unique_id {
  130. last_result.unique_id = unique_id;
  131. match &last_result.step {
  132. KVOpStep::Unseen => {
  133. panic!("Unseen results should never be seen.")
  134. }
  135. // Notify all threads that are still waiting that a new
  136. // request has arrived. This should never happen.
  137. KVOpStep::Pending(condvar) => {
  138. condvar.notify_all();
  139. }
  140. KVOpStep::Done(_) => {}
  141. }
  142. last_result.step = KVOpStep::Unseen;
  143. }
  144. // Now we know unique_id <= last_result.unique_id.
  145. assert!(unique_id <= last_result.unique_id);
  146. match &last_result.step {
  147. KVOpStep::Unseen => {
  148. let condvar = Arc::new(Condvar::new());
  149. last_result.step = KVOpStep::Pending(condvar.clone());
  150. (true, condvar)
  151. }
  152. // The operation is still pending.
  153. KVOpStep::Pending(condvar) => (false, condvar.clone()),
  154. // The operation is a Get
  155. KVOpStep::Done(CommitResult::Get(value)) => {
  156. return if unique_id == last_result.unique_id {
  157. // This is the same operation as the last one
  158. Ok(CommitResult::Get(value.clone()))
  159. } else {
  160. // A past Get operation is being retried. We do not
  161. // know the proper value to return.
  162. Err(CommitError::Expired(unique_id))
  163. };
  164. }
  165. // For Put & Append operations, all we know is that all past
  166. // operations must have been committed, returning OK.
  167. KVOpStep::Done(result) => return Ok(result.clone()),
  168. }
  169. };
  170. if unseen {
  171. let op = UniqueKVOp { op, unique_id };
  172. if self.rf.start(op).is_none() {
  173. return Err(CommitError::NotLeader);
  174. }
  175. }
  176. let mut state = self.state.lock();
  177. // Wait for the op to be comitted.
  178. condvar.wait_for(&mut state, timeout);
  179. let step = state
  180. .applied_op
  181. .get(&unique_id.clerk_id)
  182. .expect("Clerk entry should have been inserted.");
  183. if unique_id != step.unique_id {
  184. // The clerk must have seen the result of this request because they
  185. // are sending in a new one. Just return error.
  186. return Err(CommitError::Expired(unique_id));
  187. }
  188. return if let KVOpStep::Done(result) = &step.step {
  189. if unseen {
  190. Ok(result.clone())
  191. } else {
  192. Err(CommitError::Duplicate(result.clone()))
  193. }
  194. } else {
  195. Err(CommitError::TimedOut)
  196. };
  197. }
  198. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  199. pub fn get(&self, args: GetArgs) -> GetReply {
  200. let (is_retry, result) = match self.block_for_commit(
  201. args.unique_id,
  202. KVOp::Get(GetOp { key: args.key }),
  203. Self::DEFAULT_TIMEOUT,
  204. ) {
  205. Ok(result) => (false, result),
  206. Err(CommitError::Duplicate(result)) => (true, result),
  207. Err(e) => {
  208. return GetReply {
  209. result: Err(e.into()),
  210. is_retry: false,
  211. }
  212. }
  213. };
  214. let result = match result {
  215. CommitResult::Get(result) => Ok(result),
  216. CommitResult::Put => Err(KVError::Conflict),
  217. CommitResult::Append => Err(KVError::Conflict),
  218. };
  219. GetReply { result, is_retry }
  220. }
  221. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  222. let op = PutAppendOp {
  223. key: args.key,
  224. value: args.value,
  225. };
  226. let op = match args.op {
  227. PutAppendEnum::Put => KVOp::Put(op),
  228. PutAppendEnum::Append => KVOp::Append(op),
  229. };
  230. let result = match self.block_for_commit(
  231. args.unique_id,
  232. op,
  233. Self::DEFAULT_TIMEOUT,
  234. ) {
  235. Ok(result) => result,
  236. Err(CommitError::Duplicate(result)) => result,
  237. Err(e) => {
  238. return PutAppendReply {
  239. result: Err(e.into()),
  240. }
  241. }
  242. };
  243. let result = match result {
  244. CommitResult::Put => {
  245. if args.op == PutAppendEnum::Put {
  246. Ok(())
  247. } else {
  248. Err(KVError::Conflict)
  249. }
  250. }
  251. CommitResult::Append => {
  252. if args.op == PutAppendEnum::Append {
  253. Ok(())
  254. } else {
  255. Err(KVError::Conflict)
  256. }
  257. }
  258. CommitResult::Get(_) => Err(KVError::Conflict),
  259. };
  260. PutAppendReply { result }
  261. }
  262. pub fn kill(self) {
  263. self.shutdown.store(true, Ordering::Relaxed);
  264. self.rf.kill()
  265. }
  266. }