server.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. use super::common::{ClerkId, UniqueId};
  2. use parking_lot::{Condvar, Mutex};
  3. use ruaft::{Persister, Raft, RpcClient};
  4. use std::collections::HashMap;
  5. use std::sync::atomic::{AtomicBool, Ordering};
  6. use std::sync::mpsc::{channel, Receiver};
  7. use std::sync::Arc;
  8. use std::time::Duration;
  9. struct KVServer {
  10. state: Mutex<KVServerState>,
  11. rf: Raft<UniqueKVOp>,
  12. command_channel: Receiver<(usize, UniqueKVOp)>,
  13. shutdown: AtomicBool,
  14. // snapshot
  15. }
  16. #[derive(Clone, Default, Serialize, Deserialize)]
  17. struct UniqueKVOp {
  18. op: KVOp,
  19. unique_id: UniqueId,
  20. }
  21. #[derive(Default)]
  22. struct KVServerState {
  23. kv: HashMap<String, String>,
  24. debug_kv: HashMap<String, String>,
  25. applied_op: HashMap<ClerkId, UniqueKVOpStep>,
  26. }
  27. #[derive(Clone, Serialize, Deserialize)]
  28. enum KVOp {
  29. NoOp,
  30. Get(GetOp),
  31. Put(PutAppendOp),
  32. Append(PutAppendOp),
  33. }
  34. impl Default for KVOp {
  35. fn default() -> Self {
  36. KVOp::NoOp
  37. }
  38. }
  39. #[derive(Clone, Serialize, Deserialize)]
  40. struct GetOp {
  41. key: String,
  42. }
  43. #[derive(Clone, Serialize, Deserialize)]
  44. struct PutAppendOp {
  45. key: String,
  46. value: String,
  47. }
  48. struct UniqueKVOpStep {
  49. step: KVOpStep,
  50. unique_id: UniqueId,
  51. }
  52. enum KVOpStep {
  53. Unseen,
  54. Pending(Arc<Condvar>),
  55. Done(CommitResult),
  56. }
  57. #[derive(Clone)]
  58. enum CommitResult {
  59. Get(String),
  60. Put,
  61. Append,
  62. }
  63. enum CommitError {
  64. NotLeader,
  65. Expired(UniqueId),
  66. Timeout,
  67. Conflict,
  68. Duplicate(CommitResult),
  69. }
  70. impl KVServer {
  71. pub fn new(
  72. servers: Vec<RpcClient>,
  73. me: usize,
  74. persister: Arc<dyn Persister>,
  75. ) -> Self {
  76. let (tx, rx) = channel();
  77. let apply_command = move |index, command| {
  78. tx.send((index, command))
  79. .expect("The receiving end of apply command channel should have not been dropped");
  80. };
  81. Self {
  82. state: Default::default(),
  83. rf: Raft::new(
  84. servers,
  85. me,
  86. persister,
  87. apply_command,
  88. None,
  89. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  90. ),
  91. command_channel: rx,
  92. shutdown: AtomicBool::new(false),
  93. }
  94. }
  95. fn block_for_commit(
  96. &self,
  97. unique_id: UniqueId,
  98. op: KVOp,
  99. timeout: Duration,
  100. ) -> Result<CommitResult, CommitError> {
  101. let (unseen, condvar) = {
  102. let mut state = self.state.lock();
  103. let last_result = state
  104. .applied_op
  105. .entry(unique_id.clerk_id)
  106. .or_insert_with(|| UniqueKVOpStep {
  107. step: KVOpStep::Unseen,
  108. unique_id,
  109. });
  110. // We know that the two unique_ids must come from the same clerk,
  111. // because they are found in the same entry of applied_op.
  112. assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
  113. // This is a newer request
  114. if unique_id > last_result.unique_id {
  115. last_result.unique_id = unique_id;
  116. match &last_result.step {
  117. KVOpStep::Unseen => {
  118. panic!("Unseen results should never be seen.")
  119. }
  120. // Notify all threads that are still waiting that a new
  121. // request has arrived. This should never happen.
  122. KVOpStep::Pending(condvar) => {
  123. condvar.notify_all();
  124. }
  125. KVOpStep::Done(_) => {}
  126. }
  127. last_result.step = KVOpStep::Unseen;
  128. }
  129. // Now we know unique_id <= last_result.unique_id.
  130. assert!(unique_id <= last_result.unique_id);
  131. match &last_result.step {
  132. KVOpStep::Unseen => {
  133. let condvar = Arc::new(Condvar::new());
  134. last_result.step = KVOpStep::Pending(condvar.clone());
  135. (true, condvar)
  136. }
  137. // The operation is still pending.
  138. KVOpStep::Pending(condvar) => (false, condvar.clone()),
  139. // The operation is a Get
  140. KVOpStep::Done(CommitResult::Get(value)) => {
  141. return if unique_id == last_result.unique_id {
  142. // This is the same operation as the last one
  143. Ok(CommitResult::Get(value.clone()))
  144. } else {
  145. // A past Get operation is being retried. We do not
  146. // know the proper value to return.
  147. Err(CommitError::Expired(unique_id))
  148. };
  149. }
  150. // For Put & Append operations, all we know is that all past
  151. // operations must have been committed, returning OK.
  152. KVOpStep::Done(result) => return Ok(result.clone()),
  153. }
  154. };
  155. if unseen {
  156. let op = UniqueKVOp { op, unique_id };
  157. if self.rf.start(op).is_none() {
  158. return Err(CommitError::NotLeader);
  159. }
  160. }
  161. let mut state = self.state.lock();
  162. // Wait for the op to be comitted.
  163. condvar.wait_for(&mut state, timeout);
  164. let step = state
  165. .applied_op
  166. .get(&unique_id.clerk_id)
  167. .expect("Clerk entry should have been inserted.");
  168. if unique_id != step.unique_id {
  169. // The clerk must have seen the result of this request because they
  170. // are sending in a new one. Just return error.
  171. return Err(CommitError::Expired(unique_id));
  172. }
  173. return if let KVOpStep::Done(result) = &step.step {
  174. if unseen {
  175. Ok(result.clone())
  176. } else {
  177. Err(CommitError::Duplicate(result.clone()))
  178. }
  179. } else {
  180. Err(CommitError::Timeout)
  181. };
  182. }
  183. pub fn kill(self) {
  184. self.shutdown.store(true, Ordering::Relaxed);
  185. self.rf.kill()
  186. }
  187. }