server.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. use std::collections::hash_map::Entry;
  2. use std::collections::HashMap;
  3. use std::sync::atomic::{AtomicUsize, Ordering};
  4. use std::sync::mpsc::{channel, Receiver};
  5. use std::sync::Arc;
  6. use std::time::Duration;
  7. use parking_lot::{Condvar, Mutex};
  8. use ruaft::{Persister, Raft, RpcClient, Term};
  9. use crate::common::{
  10. ClerkId, GetArgs, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  11. PutAppendReply, UniqueId,
  12. };
  13. pub struct KVServer {
  14. me: AtomicUsize,
  15. state: Mutex<KVServerState>,
  16. rf: Mutex<Raft<UniqueKVOp>>,
  17. // snapshot
  18. }
  19. type IndexedCommand = (usize, UniqueKVOp);
  20. #[derive(Clone, Default, Serialize, Deserialize)]
  21. pub struct UniqueKVOp {
  22. op: KVOp,
  23. me: usize,
  24. unique_id: UniqueId,
  25. }
  26. #[derive(Default)]
  27. struct KVServerState {
  28. kv: HashMap<String, String>,
  29. debug_kv: HashMap<String, String>,
  30. applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
  31. queries: HashMap<UniqueId, Arc<ResultHolder>>,
  32. }
  33. #[derive(Clone, Serialize, Deserialize)]
  34. enum KVOp {
  35. NoOp,
  36. Get(String),
  37. Put(String, String),
  38. Append(String, String),
  39. }
  40. impl Default for KVOp {
  41. fn default() -> Self {
  42. KVOp::NoOp
  43. }
  44. }
  45. struct ResultHolder {
  46. term: AtomicUsize,
  47. result: Mutex<Result<CommitResult, CommitError>>,
  48. condvar: Condvar,
  49. }
  50. #[derive(Clone, Debug)]
  51. enum CommitResult {
  52. Get(Option<String>),
  53. Put,
  54. Append,
  55. }
  56. #[derive(Clone, Debug)]
  57. enum CommitError {
  58. NotLeader,
  59. Expired(UniqueId),
  60. TimedOut,
  61. Conflict,
  62. NotMe(CommitResult),
  63. Duplicate(CommitResult),
  64. }
  65. impl From<CommitError> for KVError {
  66. fn from(err: CommitError) -> Self {
  67. match err {
  68. CommitError::NotLeader => KVError::NotLeader,
  69. CommitError::Expired(_) => KVError::Expired,
  70. CommitError::TimedOut => KVError::TimedOut,
  71. CommitError::Conflict => KVError::Conflict,
  72. CommitError::NotMe(_) => panic!("NotMe is not a KVError"),
  73. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  74. }
  75. }
  76. }
  77. impl KVServer {
  78. pub fn new(
  79. servers: Vec<RpcClient>,
  80. me: usize,
  81. persister: Arc<dyn Persister>,
  82. ) -> Arc<Self> {
  83. let (tx, rx) = channel();
  84. let apply_command = move |index, command| {
  85. tx.send((index, command))
  86. .expect("The receiving end of apply command channel should have not been dropped");
  87. };
  88. let ret = Arc::new(Self {
  89. me: AtomicUsize::new(me),
  90. state: Default::default(),
  91. rf: Mutex::new(Raft::new(
  92. servers,
  93. me,
  94. persister,
  95. apply_command,
  96. None,
  97. Raft::<UniqueKVOp>::NO_SNAPSHOT,
  98. )),
  99. });
  100. ret.process_command(rx);
  101. ret
  102. }
  103. fn apply_op(&self, unique_id: UniqueId, leader: usize, op: KVOp) {
  104. // The borrow checker does not allow borrowing two fields of an instance
  105. // inside a MutexGuard. But it does allow borrowing two fields of the
  106. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  107. let state = &mut *self.state.lock();
  108. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  109. let entry = applied_op.entry(unique_id.clerk_id);
  110. if let Entry::Occupied(curr) = &entry {
  111. let (applied_unique_id, _) = curr.get();
  112. if *applied_unique_id >= unique_id {
  113. // Redelivered.
  114. // It is guaranteed that we have no pending queries with the
  115. // same unique_id, because
  116. // 1. When inserting into queries, we first check the unique_id
  117. // is strictly larger than the one in applied_op.
  118. // 2. When modifying entries in applied_op, the unique_id can
  119. // only grow larger. And we make sure there is no entries with
  120. // the same unique_id in queries.
  121. // TODO(ditsing): in case 2), make sure there is no entries in
  122. // queries that have a smaller unique_id.
  123. assert!(!state.queries.contains_key(&unique_id));
  124. return;
  125. }
  126. }
  127. let result = match op {
  128. KVOp::NoOp => return,
  129. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  130. KVOp::Put(key, value) => {
  131. kv.insert(key, value);
  132. CommitResult::Put
  133. }
  134. KVOp::Append(key, value) => {
  135. kv.entry(key)
  136. .and_modify(|str| str.push_str(&value))
  137. .or_insert(value);
  138. CommitResult::Append
  139. }
  140. };
  141. match entry {
  142. Entry::Occupied(mut curr) => {
  143. curr.insert((unique_id, result.clone()));
  144. }
  145. Entry::Vacant(vacant) => {
  146. vacant.insert((unique_id, result.clone()));
  147. }
  148. }
  149. if let Some(result_holder) = state.queries.remove(&unique_id) {
  150. // If this KV server might not be the same leader that committed
  151. // this change. We are not sure if it is a duplicate or a conflict.
  152. // To tell the difference, the terms and operations must be stored.
  153. *result_holder.result.lock() = if leader == self.me() {
  154. Ok(result)
  155. } else {
  156. Err(CommitError::NotMe(result))
  157. };
  158. result_holder.condvar.notify_all();
  159. };
  160. }
  161. fn process_command(
  162. self: &Arc<Self>,
  163. command_channel: Receiver<IndexedCommand>,
  164. ) {
  165. let this = Arc::downgrade(self);
  166. std::thread::spawn(move || {
  167. while let Ok((_, command)) = command_channel.recv() {
  168. if let Some(this) = this.upgrade() {
  169. this.apply_op(command.unique_id, command.me, command.op);
  170. } else {
  171. break;
  172. }
  173. }
  174. });
  175. }
  176. const UNSEEN_TERM: usize = 0;
  177. const ATTEMPTING_TERM: usize = usize::MAX;
  178. fn block_for_commit(
  179. &self,
  180. unique_id: UniqueId,
  181. op: KVOp,
  182. timeout: Duration,
  183. ) -> Result<CommitResult, CommitError> {
  184. let result_holder = {
  185. let mut state = self.state.lock();
  186. let applied = state.applied_op.get(&unique_id.clerk_id);
  187. if let Some((applied_unique_id, result)) = applied {
  188. if unique_id < *applied_unique_id {
  189. return Err(CommitError::Expired(unique_id));
  190. } else if unique_id == *applied_unique_id {
  191. return Err(CommitError::Duplicate(result.clone()));
  192. }
  193. };
  194. let entry = state.queries.entry(unique_id).or_insert_with(|| {
  195. Arc::new(ResultHolder {
  196. term: AtomicUsize::new(Self::UNSEEN_TERM),
  197. result: Mutex::new(Err(CommitError::TimedOut)),
  198. condvar: Condvar::new(),
  199. })
  200. });
  201. entry.clone()
  202. };
  203. let (Term(hold_term), is_leader) = self.rf.lock().get_state();
  204. if !is_leader {
  205. result_holder.condvar.notify_all();
  206. return Err(CommitError::NotLeader);
  207. }
  208. Self::validate_term(hold_term);
  209. let set = result_holder.term.compare_exchange(
  210. Self::UNSEEN_TERM,
  211. Self::ATTEMPTING_TERM,
  212. Ordering::SeqCst,
  213. Ordering::SeqCst,
  214. );
  215. let start = match set {
  216. // Nobody has attempted start() yet.
  217. Ok(Self::UNSEEN_TERM) => true,
  218. Ok(_) => panic!(
  219. "compare_exchange should always return the current value 0"
  220. ),
  221. // Somebody has attempted, or is attempting, start().
  222. Err(prev_term) => {
  223. let start =
  224. prev_term != Self::ATTEMPTING_TERM && prev_term < hold_term;
  225. if start {
  226. let set = result_holder.term.compare_exchange(
  227. prev_term,
  228. Self::ATTEMPTING_TERM,
  229. Ordering::SeqCst,
  230. Ordering::SeqCst,
  231. );
  232. set.is_ok()
  233. } else {
  234. false
  235. }
  236. }
  237. };
  238. if start {
  239. let op = UniqueKVOp {
  240. op,
  241. me: self.me(),
  242. unique_id,
  243. };
  244. let start = self.rf.lock().start(op);
  245. let start_term =
  246. start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
  247. Self::validate_term(term);
  248. term
  249. });
  250. let set = result_holder.term.compare_exchange(
  251. Self::ATTEMPTING_TERM,
  252. start_term,
  253. Ordering::SeqCst,
  254. Ordering::SeqCst,
  255. );
  256. // Setting term must have been successful, and must return the
  257. // value previously set by this attempt.
  258. assert_eq!(set, Ok(Self::ATTEMPTING_TERM));
  259. if start_term == Self::UNSEEN_TERM {
  260. result_holder.condvar.notify_all();
  261. return Err(CommitError::NotLeader);
  262. }
  263. }
  264. let mut guard = result_holder.result.lock();
  265. // Wait for the op to be committed.
  266. result_holder.condvar.wait_for(&mut guard, timeout);
  267. // Copy the result out.
  268. let result = guard.clone();
  269. // If the result is OK, all other requests should see "Duplicate".
  270. if let Ok(result) = guard.clone() {
  271. *guard = Err(CommitError::Duplicate(result))
  272. }
  273. return result;
  274. }
  275. fn validate_term(term: usize) {
  276. assert!(term > Self::UNSEEN_TERM, "Term must be larger than 0.");
  277. assert!(
  278. term < Self::ATTEMPTING_TERM,
  279. "Term must be smaller than usize::MAX."
  280. );
  281. }
  282. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  283. pub fn get(&self, args: GetArgs) -> GetReply {
  284. let (is_retry, result) = match self.block_for_commit(
  285. args.unique_id,
  286. KVOp::Get(args.key),
  287. Self::DEFAULT_TIMEOUT,
  288. ) {
  289. Ok(result) => (false, result),
  290. Err(CommitError::Duplicate(result)) => (true, result),
  291. Err(CommitError::NotMe(result)) => (true, result),
  292. Err(e) => {
  293. return GetReply {
  294. result: Err(e.into()),
  295. is_retry: false,
  296. }
  297. }
  298. };
  299. let result = match result {
  300. CommitResult::Get(result) => Ok(result),
  301. CommitResult::Put => Err(KVError::Conflict),
  302. CommitResult::Append => Err(KVError::Conflict),
  303. };
  304. GetReply { result, is_retry }
  305. }
  306. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  307. let op = match args.op {
  308. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  309. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  310. };
  311. let result = match self.block_for_commit(
  312. args.unique_id,
  313. op,
  314. Self::DEFAULT_TIMEOUT,
  315. ) {
  316. Ok(result) => result,
  317. Err(CommitError::Duplicate(result)) => result,
  318. Err(CommitError::NotMe(result)) => result,
  319. Err(e) => {
  320. return PutAppendReply {
  321. result: Err(e.into()),
  322. }
  323. }
  324. };
  325. let result = match result {
  326. CommitResult::Put => {
  327. if args.op == PutAppendEnum::Put {
  328. Ok(())
  329. } else {
  330. Err(KVError::Conflict)
  331. }
  332. }
  333. CommitResult::Append => {
  334. if args.op == PutAppendEnum::Append {
  335. Ok(())
  336. } else {
  337. Err(KVError::Conflict)
  338. }
  339. }
  340. CommitResult::Get(_) => Err(KVError::Conflict),
  341. };
  342. PutAppendReply { result }
  343. }
  344. pub fn me(&self) -> usize {
  345. self.me.load(Ordering::Relaxed)
  346. }
  347. pub fn raft(&self) -> Raft<UniqueKVOp> {
  348. self.rf.lock().clone()
  349. }
  350. pub fn kill(self: Arc<Self>) {
  351. let rf = self.raft();
  352. // We must drop self to remove the only clone of raft, so that
  353. // `rf.kill()` does not block.
  354. drop(self);
  355. rf.kill();
  356. // The process_command thread will exit, after Raft drops the reference
  357. // to the sender.
  358. }
  359. }