server.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. use super::common::{
  2. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  3. PutAppendReply, UniqueId,
  4. };
  5. use parking_lot::{Condvar, Mutex};
  6. use ruaft::{Persister, Raft, RpcClient, Term};
  7. use std::collections::hash_map::Entry;
  8. use std::collections::HashMap;
  9. use std::sync::atomic::{AtomicUsize, Ordering};
  10. use std::sync::mpsc::{channel, Receiver};
  11. use std::sync::Arc;
  12. use std::time::Duration;
  13. pub struct KVServer {
  14. me: AtomicUsize,
  15. state: Mutex<KVServerState>,
  16. rf: Mutex<Raft<UniqueKVOp>>,
  17. // snapshot
  18. }
  19. type IndexedCommand = (usize, UniqueKVOp);
  20. #[derive(Clone, Default, Serialize, Deserialize)]
  21. pub struct UniqueKVOp {
  22. op: KVOp,
  23. me: usize,
  24. unique_id: UniqueId,
  25. }
  26. #[derive(Default)]
  27. struct KVServerState {
  28. kv: HashMap<String, String>,
  29. debug_kv: HashMap<String, String>,
  30. applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
  31. queries: HashMap<UniqueId, Arc<ResultHolder>>,
  32. }
  33. #[derive(Clone, Serialize, Deserialize)]
  34. enum KVOp {
  35. NoOp,
  36. Get(String),
  37. Put(String, String),
  38. Append(String, String),
  39. }
  40. impl Default for KVOp {
  41. fn default() -> Self {
  42. KVOp::NoOp
  43. }
  44. }
  45. struct ResultHolder {
  46. term: AtomicUsize,
  47. result: Mutex<Result<CommitResult, CommitError>>,
  48. condvar: Condvar,
  49. }
  50. #[derive(Clone, Debug)]
  51. enum CommitResult {
  52. Get(Option<String>),
  53. Put,
  54. Append,
  55. }
  56. #[derive(Clone, Debug)]
  57. enum CommitError {
  58. NotLeader,
  59. Expired(UniqueId),
  60. TimedOut,
  61. Conflict,
  62. Duplicate(CommitResult),
  63. }
  64. impl From<CommitError> for KVError {
  65. fn from(err: CommitError) -> Self {
  66. match err {
  67. CommitError::NotLeader => KVError::NotLeader,
  68. CommitError::Expired(_) => KVError::Expired,
  69. CommitError::TimedOut => KVError::TimedOut,
  70. CommitError::Conflict => KVError::Conflict,
  71. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  72. }
  73. }
  74. }
  75. impl KVServer {
  76. pub fn new(
  77. servers: Vec<RpcClient>,
  78. me: usize,
  79. persister: Arc<dyn Persister>,
  80. ) -> Arc<Self> {
  81. let (tx, rx) = channel();
  82. let apply_command = move |index, command| {
  83. tx.send((index, command))
  84. .expect("The receiving end of apply command channel should have not been dropped");
  85. };
  86. let ret = Arc::new(Self {
  87. me: AtomicUsize::new(me),
  88. state: Default::default(),
  89. rf: Mutex::new(Raft::new(
  90. servers,
  91. me,
  92. persister,
  93. apply_command,
  94. None,
  95. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  96. )),
  97. });
  98. ret.clone().process_command(rx);
  99. ret
  100. }
  101. fn apply_op(&self, unique_id: UniqueId, leader: usize, op: KVOp) {
  102. // The borrow checker does not allow borrowing two fields of an instance
  103. // inside a MutexGuard. But it does allow borrowing two fields of the
  104. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  105. let state = &mut *self.state.lock();
  106. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  107. let entry = applied_op.entry(unique_id.clerk_id);
  108. if let Entry::Occupied(curr) = &entry {
  109. let (applied_unique_id, _) = curr.get();
  110. if *applied_unique_id >= unique_id {
  111. // Redelivered.
  112. // It is guaranteed that we have no pending queries with the
  113. // same unique_id, because
  114. // 1. When inserting into queries, we first check the unique_id
  115. // is strictly larger than the one in applied_op.
  116. // 2. When modifying entries in applied_op, the unique_id can
  117. // only grow larger. And we make sure there is no entries with
  118. // the same unique_id in queries.
  119. // TODO(ditsing): in case 2), make sure there is no entries in
  120. // queries that have a smaller unique_id.
  121. assert!(!state.queries.contains_key(&unique_id));
  122. return;
  123. }
  124. }
  125. let result = match op {
  126. KVOp::NoOp => return,
  127. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  128. KVOp::Put(key, value) => {
  129. kv.insert(key, value);
  130. CommitResult::Put
  131. }
  132. KVOp::Append(key, value) => {
  133. kv.entry(key)
  134. .and_modify(|str| str.push_str(&value))
  135. .or_insert(value);
  136. CommitResult::Append
  137. }
  138. };
  139. match entry {
  140. Entry::Occupied(mut curr) => {
  141. curr.insert((unique_id, result.clone()));
  142. }
  143. Entry::Vacant(vacant) => {
  144. vacant.insert((unique_id, result.clone()));
  145. }
  146. }
  147. if let Some(result_holder) = state.queries.remove(&unique_id) {
  148. // If this KV server might not be the same leader that committed
  149. // this change. We are not sure if it is a duplicate or a conflict.
  150. // To tell the difference, the terms and operations must be stored.
  151. *result_holder.result.lock() = if leader == self.me() {
  152. Ok(result)
  153. } else {
  154. Err(CommitError::Conflict)
  155. };
  156. result_holder.condvar.notify_all();
  157. };
  158. }
  159. fn process_command(
  160. self: Arc<Self>,
  161. command_channel: Receiver<IndexedCommand>,
  162. ) {
  163. std::thread::spawn(move || {
  164. while let Ok((_, command)) = command_channel.recv() {
  165. self.apply_op(command.unique_id, command.me, command.op);
  166. }
  167. });
  168. }
  169. const UNSEEN_TERM: usize = 0;
  170. const ATTEMPTING_TERM: usize = usize::MAX;
  171. fn block_for_commit(
  172. &self,
  173. unique_id: UniqueId,
  174. op: KVOp,
  175. timeout: Duration,
  176. ) -> Result<CommitResult, CommitError> {
  177. let result_holder = {
  178. let mut state = self.state.lock();
  179. let applied = state.applied_op.get(&unique_id.clerk_id);
  180. if let Some((applied_unique_id, result)) = applied {
  181. if unique_id < *applied_unique_id {
  182. return Err(CommitError::Expired(unique_id));
  183. } else if unique_id == *applied_unique_id {
  184. return Err(CommitError::Duplicate(result.clone()));
  185. }
  186. };
  187. let entry = state.queries.entry(unique_id).or_insert_with(|| {
  188. Arc::new(ResultHolder {
  189. term: AtomicUsize::new(Self::UNSEEN_TERM),
  190. result: Mutex::new(Err(CommitError::TimedOut)),
  191. condvar: Condvar::new(),
  192. })
  193. });
  194. entry.clone()
  195. };
  196. let (Term(hold_term), is_leader) = self.rf.lock().get_state();
  197. if !is_leader {
  198. result_holder.condvar.notify_all();
  199. return Err(CommitError::NotLeader);
  200. }
  201. Self::validate_term(hold_term);
  202. let set = result_holder.term.compare_exchange(
  203. Self::UNSEEN_TERM,
  204. Self::ATTEMPTING_TERM,
  205. Ordering::SeqCst,
  206. Ordering::SeqCst,
  207. );
  208. let start = match set {
  209. // Nobody has attempted start() yet.
  210. Ok(Self::UNSEEN_TERM) => true,
  211. Ok(_) => panic!(
  212. "compare_exchange should always return the current value 0"
  213. ),
  214. // Somebody has attempted, or is attempting, start().
  215. Err(prev_term) => {
  216. prev_term != Self::ATTEMPTING_TERM && prev_term < hold_term
  217. }
  218. };
  219. if start {
  220. let op = UniqueKVOp {
  221. op,
  222. me: self.me(),
  223. unique_id,
  224. };
  225. let start = self.rf.lock().start(op);
  226. let start_term =
  227. start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
  228. Self::validate_term(term);
  229. term
  230. });
  231. let set = result_holder.term.compare_exchange(
  232. Self::ATTEMPTING_TERM,
  233. start_term,
  234. Ordering::SeqCst,
  235. Ordering::SeqCst,
  236. );
  237. // Setting term must have been successful, and must return the
  238. // value previously set by this attempt.
  239. assert_eq!(set, Ok(Self::ATTEMPTING_TERM));
  240. if start_term == Self::UNSEEN_TERM {
  241. result_holder.condvar.notify_all();
  242. return Err(CommitError::NotLeader);
  243. }
  244. }
  245. let mut guard = result_holder.result.lock();
  246. // Wait for the op to be committed.
  247. result_holder.condvar.wait_for(&mut guard, timeout);
  248. // Copy the result out.
  249. let result = guard.clone();
  250. // If the result is OK, all other requests should see "Duplicate".
  251. if let Ok(result) = guard.clone() {
  252. *guard = Err(CommitError::Duplicate(result))
  253. }
  254. return result;
  255. }
  256. fn validate_term(term: usize) {
  257. assert!(term > Self::UNSEEN_TERM, "Term must be larger than 0.");
  258. assert!(
  259. term < Self::ATTEMPTING_TERM,
  260. "Term must be smaller than usize::MAX."
  261. );
  262. }
  263. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
  264. pub fn get(&self, args: GetArgs) -> GetReply {
  265. let (is_retry, result) = match self.block_for_commit(
  266. args.unique_id,
  267. KVOp::Get(args.key),
  268. Self::DEFAULT_TIMEOUT,
  269. ) {
  270. Ok(result) => (false, result),
  271. Err(CommitError::Duplicate(result)) => (true, result),
  272. Err(e) => {
  273. return GetReply {
  274. result: Err(e.into()),
  275. is_retry: false,
  276. }
  277. }
  278. };
  279. let result = match result {
  280. CommitResult::Get(result) => Ok(result),
  281. CommitResult::Put => Err(KVError::Conflict),
  282. CommitResult::Append => Err(KVError::Conflict),
  283. };
  284. GetReply { result, is_retry }
  285. }
  286. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  287. let op = match args.op {
  288. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  289. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  290. };
  291. let result = match self.block_for_commit(
  292. args.unique_id,
  293. op,
  294. Self::DEFAULT_TIMEOUT,
  295. ) {
  296. Ok(result) => result,
  297. Err(CommitError::Duplicate(result)) => result,
  298. Err(e) => {
  299. return PutAppendReply {
  300. result: Err(e.into()),
  301. }
  302. }
  303. };
  304. let result = match result {
  305. CommitResult::Put => {
  306. if args.op == PutAppendEnum::Put {
  307. Ok(())
  308. } else {
  309. Err(KVError::Conflict)
  310. }
  311. }
  312. CommitResult::Append => {
  313. if args.op == PutAppendEnum::Append {
  314. Ok(())
  315. } else {
  316. Err(KVError::Conflict)
  317. }
  318. }
  319. CommitResult::Get(_) => Err(KVError::Conflict),
  320. };
  321. PutAppendReply { result }
  322. }
  323. pub fn me(&self) -> usize {
  324. self.me.load(Ordering::Relaxed)
  325. }
  326. pub fn raft(&self) -> Raft<UniqueKVOp> {
  327. self.rf.lock().clone()
  328. }
  329. pub fn kill(self) {
  330. self.rf.into_inner().kill()
  331. // The process_command thread will exit, after Raft drops the reference
  332. // to the sender.
  333. }
  334. }