server.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. use super::common::{ClerkId, UniqueId};
  2. use crate::kvraft::common::{GetArgs, GetReply, KVError};
  3. use parking_lot::{Condvar, Mutex};
  4. use ruaft::{Persister, Raft, RpcClient};
  5. use std::collections::HashMap;
  6. use std::sync::atomic::{AtomicBool, Ordering};
  7. use std::sync::mpsc::{channel, Receiver};
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. struct KVServer {
  11. state: Mutex<KVServerState>,
  12. rf: Raft<UniqueKVOp>,
  13. command_channel: Receiver<(usize, UniqueKVOp)>,
  14. shutdown: AtomicBool,
  15. // snapshot
  16. }
  17. #[derive(Clone, Default, Serialize, Deserialize)]
  18. struct UniqueKVOp {
  19. op: KVOp,
  20. unique_id: UniqueId,
  21. }
  22. #[derive(Default)]
  23. struct KVServerState {
  24. kv: HashMap<String, String>,
  25. debug_kv: HashMap<String, String>,
  26. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  27. }
  28. #[derive(Clone, Serialize, Deserialize)]
  29. enum KVOp {
  30. NoOp,
  31. Get(GetOp),
  32. Put(PutAppendOp),
  33. Append(PutAppendOp),
  34. }
  35. impl Default for KVOp {
  36. fn default() -> Self {
  37. KVOp::NoOp
  38. }
  39. }
  40. #[derive(Clone, Serialize, Deserialize)]
  41. struct GetOp {
  42. key: String,
  43. }
  44. #[derive(Clone, Serialize, Deserialize)]
  45. struct PutAppendOp {
  46. key: String,
  47. value: String,
  48. }
  49. struct UniqueKVOpStep {
  50. step: KVOpStep,
  51. unique_id: UniqueId,
  52. }
  53. enum KVOpStep {
  54. Unseen,
  55. Pending(Arc<Condvar>),
  56. Done(CommitResult),
  57. }
  58. #[derive(Clone, Debug)]
  59. enum CommitResult {
  60. Get(Option<String>),
  61. Put,
  62. Append,
  63. }
  64. #[derive(Debug)]
  65. enum CommitError {
  66. NotLeader,
  67. Expired(UniqueId),
  68. TimedOut,
  69. Conflict,
  70. Duplicate(CommitResult),
  71. }
  72. impl From<CommitError> for KVError {
  73. fn from(err: CommitError) -> Self {
  74. match err {
  75. CommitError::NotLeader => KVError::NotLeader,
  76. CommitError::Expired(_) => KVError::Expired,
  77. CommitError::TimedOut => KVError::TimedOut,
  78. CommitError::Conflict => KVError::Conflict,
  79. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  80. }
  81. }
  82. }
  83. impl KVServer {
  84. pub fn new(
  85. servers: Vec<RpcClient>,
  86. me: usize,
  87. persister: Arc<dyn Persister>,
  88. ) -> Self {
  89. let (tx, rx) = channel();
  90. let apply_command = move |index, command| {
  91. tx.send((index, command))
  92. .expect("The receiving end of apply command channel should have not been dropped");
  93. };
  94. Self {
  95. state: Default::default(),
  96. rf: Raft::new(
  97. servers,
  98. me,
  99. persister,
  100. apply_command,
  101. None,
  102. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  103. ),
  104. command_channel: rx,
  105. shutdown: AtomicBool::new(false),
  106. }
  107. }
  108. fn block_for_commit(
  109. &self,
  110. unique_id: UniqueId,
  111. op: KVOp,
  112. timeout: Duration,
  113. ) -> Result<CommitResult, CommitError> {
  114. let (unseen, condvar) = {
  115. let mut state = self.state.lock();
  116. let last_result = state
  117. .applied_op
  118. .entry(unique_id.clerk_id)
  119. .or_insert_with(|| UniqueKVOpStep {
  120. step: KVOpStep::Unseen,
  121. unique_id,
  122. });
  123. // We know that the two unique_ids must come from the same clerk,
  124. // because they are found in the same entry of applied_op.
  125. assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
  126. // This is a newer request
  127. if unique_id > last_result.unique_id {
  128. last_result.unique_id = unique_id;
  129. match &last_result.step {
  130. KVOpStep::Unseen => {
  131. panic!("Unseen results should never be seen.")
  132. }
  133. // Notify all threads that are still waiting that a new
  134. // request has arrived. This should never happen.
  135. KVOpStep::Pending(condvar) => {
  136. condvar.notify_all();
  137. }
  138. KVOpStep::Done(_) => {}
  139. }
  140. last_result.step = KVOpStep::Unseen;
  141. }
  142. // Now we know unique_id <= last_result.unique_id.
  143. assert!(unique_id <= last_result.unique_id);
  144. match &last_result.step {
  145. KVOpStep::Unseen => {
  146. let condvar = Arc::new(Condvar::new());
  147. last_result.step = KVOpStep::Pending(condvar.clone());
  148. (true, condvar)
  149. }
  150. // The operation is still pending.
  151. KVOpStep::Pending(condvar) => (false, condvar.clone()),
  152. // The operation is a Get
  153. KVOpStep::Done(CommitResult::Get(value)) => {
  154. return if unique_id == last_result.unique_id {
  155. // This is the same operation as the last one
  156. Ok(CommitResult::Get(value.clone()))
  157. } else {
  158. // A past Get operation is being retried. We do not
  159. // know the proper value to return.
  160. Err(CommitError::Expired(unique_id))
  161. };
  162. }
  163. // For Put & Append operations, all we know is that all past
  164. // operations must have been committed, returning OK.
  165. KVOpStep::Done(result) => return Ok(result.clone()),
  166. }
  167. };
  168. if unseen {
  169. let op = UniqueKVOp { op, unique_id };
  170. if self.rf.start(op).is_none() {
  171. return Err(CommitError::NotLeader);
  172. }
  173. }
  174. let mut state = self.state.lock();
  175. // Wait for the op to be comitted.
  176. condvar.wait_for(&mut state, timeout);
  177. let step = state
  178. .applied_op
  179. .get(&unique_id.clerk_id)
  180. .expect("Clerk entry should have been inserted.");
  181. if unique_id != step.unique_id {
  182. // The clerk must have seen the result of this request because they
  183. // are sending in a new one. Just return error.
  184. return Err(CommitError::Expired(unique_id));
  185. }
  186. return if let KVOpStep::Done(result) = &step.step {
  187. if unseen {
  188. Ok(result.clone())
  189. } else {
  190. Err(CommitError::Duplicate(result.clone()))
  191. }
  192. } else {
  193. Err(CommitError::TimedOut)
  194. };
  195. }
  196. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  197. pub fn get(&self, args: GetArgs) -> GetReply {
  198. let (is_retry, result) = match self.block_for_commit(
  199. args.unique_id,
  200. KVOp::Get(GetOp { key: args.key }),
  201. Self::DEFAULT_TIMEOUT,
  202. ) {
  203. Ok(result) => (false, result),
  204. Err(CommitError::Duplicate(result)) => (true, result),
  205. Err(e) => {
  206. return GetReply {
  207. result: Err(e.into()),
  208. is_retry: false,
  209. }
  210. }
  211. };
  212. let result = match result {
  213. CommitResult::Get(result) => Ok(result),
  214. CommitResult::Put => Err(KVError::Conflict),
  215. CommitResult::Append => Err(KVError::Conflict),
  216. };
  217. GetReply { result, is_retry }
  218. }
  219. pub fn kill(self) {
  220. self.shutdown.store(true, Ordering::Relaxed);
  221. self.rf.kill()
  222. }
  223. }