server.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. use std::collections::hash_map::Entry;
  2. use std::collections::HashMap;
  3. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  4. use std::sync::mpsc::{channel, Receiver};
  5. use std::sync::Arc;
  6. use std::time::Duration;
  7. use parking_lot::{Condvar, Mutex};
  8. use serde_derive::{Deserialize, Serialize};
  9. use ruaft::{ApplyCommandMessage, Persister, Raft, RemoteRaft, Term};
  10. use test_utils::thread_local_logger::LocalLogger;
  11. use crate::common::{
  12. ClerkId, GetArgs, GetEnum, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  13. PutAppendReply, UniqueId,
  14. };
  15. use crate::snapshot_holder::SnapshotHolder;
  16. pub struct KVServer {
  17. me: AtomicUsize,
  18. state: Mutex<KVServerState>,
  19. rf: Mutex<Raft<UniqueKVOp>>,
  20. keep_running: AtomicBool,
  21. logger: LocalLogger,
  22. }
  23. #[derive(Clone, Default, Serialize, Deserialize)]
  24. pub struct UniqueKVOp {
  25. op: KVOp,
  26. me: usize,
  27. unique_id: UniqueId,
  28. }
  29. #[derive(Default, Serialize, Deserialize)]
  30. struct KVServerState {
  31. kv: HashMap<String, String>,
  32. debug_kv: HashMap<String, String>,
  33. applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
  34. #[serde(skip)]
  35. queries: HashMap<UniqueId, Arc<ResultHolder>>,
  36. }
  37. #[derive(Clone, Serialize, Deserialize)]
  38. enum KVOp {
  39. NoOp,
  40. Get(String),
  41. Put(String, String),
  42. Append(String, String),
  43. }
  44. impl Default for KVOp {
  45. fn default() -> Self {
  46. KVOp::NoOp
  47. }
  48. }
  49. struct ResultHolder {
  50. term: AtomicUsize,
  51. result: Mutex<Result<CommitResult, CommitError>>,
  52. condvar: Condvar,
  53. }
  54. #[derive(Clone, Debug, Serialize, Deserialize)]
  55. enum CommitResult {
  56. Get(Option<String>),
  57. Put,
  58. Append,
  59. }
  60. #[derive(Clone, Debug)]
  61. enum CommitError {
  62. NotLeader,
  63. Expired(UniqueId),
  64. TimedOut,
  65. #[allow(dead_code)]
  66. Conflict,
  67. NotMe(CommitResult),
  68. Duplicate(CommitResult),
  69. }
  70. impl From<CommitError> for KVError {
  71. fn from(err: CommitError) -> Self {
  72. match err {
  73. CommitError::NotLeader => KVError::NotLeader,
  74. CommitError::Expired(_) => KVError::Expired,
  75. CommitError::TimedOut => KVError::TimedOut,
  76. CommitError::Conflict => KVError::Conflict,
  77. CommitError::NotMe(_) => panic!("NotMe is not a KVError"),
  78. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  79. }
  80. }
  81. }
  82. impl KVServer {
  83. pub fn new(
  84. servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
  85. me: usize,
  86. persister: Arc<dyn Persister>,
  87. max_state_size_bytes: Option<usize>,
  88. ) -> Arc<Self> {
  89. let (tx, rx) = channel();
  90. let apply_command = move |message| {
  91. // Ignore apply errors.
  92. let _ = tx.send(message);
  93. };
  94. let snapshot_holder = Arc::new(SnapshotHolder::default());
  95. let snapshot_holder_clone = snapshot_holder.clone();
  96. let ret = Arc::new(Self {
  97. me: AtomicUsize::new(me),
  98. state: Default::default(),
  99. rf: Mutex::new(Raft::new(
  100. servers,
  101. me,
  102. persister,
  103. apply_command,
  104. max_state_size_bytes,
  105. move |index| snapshot_holder_clone.request_snapshot(index),
  106. )),
  107. keep_running: AtomicBool::new(true),
  108. logger: LocalLogger::inherit(),
  109. });
  110. ret.process_command(snapshot_holder, rx);
  111. ret
  112. }
  113. fn apply_op(&self, unique_id: UniqueId, leader: usize, op: KVOp) {
  114. // The borrow checker does not allow borrowing two fields of an instance
  115. // inside a MutexGuard. But it does allow borrowing two fields of the
  116. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  117. let state = &mut *self.state.lock();
  118. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  119. let entry = applied_op.entry(unique_id.clerk_id);
  120. if let Entry::Occupied(curr) = &entry {
  121. let (applied_unique_id, _) = curr.get();
  122. if *applied_unique_id >= unique_id {
  123. // Redelivered.
  124. // It is guaranteed that we have no pending queries with the
  125. // same unique_id, because
  126. // 1. When inserting into queries, we first check the unique_id
  127. // is strictly larger than the one in applied_op.
  128. // 2. When modifying entries in applied_op, the unique_id can
  129. // only grow larger. And we make sure there is no entries with
  130. // the same unique_id in queries.
  131. // TODO(ditsing): in case 2), make sure there is no entries in
  132. // queries that have a smaller unique_id.
  133. assert!(!state.queries.contains_key(&unique_id));
  134. return;
  135. }
  136. }
  137. let result = match op {
  138. KVOp::NoOp => return,
  139. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  140. KVOp::Put(key, value) => {
  141. kv.insert(key, value);
  142. CommitResult::Put
  143. }
  144. KVOp::Append(key, value) => {
  145. kv.entry(key)
  146. .and_modify(|str| str.push_str(&value))
  147. .or_insert(value);
  148. CommitResult::Append
  149. }
  150. };
  151. match entry {
  152. Entry::Occupied(mut curr) => {
  153. curr.insert((unique_id, result.clone()));
  154. }
  155. Entry::Vacant(vacant) => {
  156. vacant.insert((unique_id, result.clone()));
  157. }
  158. }
  159. if let Some(result_holder) = state.queries.remove(&unique_id) {
  160. // This KV server might not be the same leader that committed the
  161. // query. We are not sure if it is a duplicate or a conflict. To
  162. // tell the difference, terms of all queries must be stored.
  163. *result_holder.result.lock() = if leader == self.me() {
  164. Ok(result)
  165. } else {
  166. Err(CommitError::NotMe(result))
  167. };
  168. result_holder.condvar.notify_all();
  169. };
  170. }
  171. fn restore_state(&self, mut new_state: KVServerState) {
  172. let mut state = self.state.lock();
  173. // Cleanup all existing queries.
  174. for result_holder in state.queries.values() {
  175. *result_holder.result.lock() = Err(CommitError::NotLeader);
  176. result_holder.condvar.notify_all();
  177. }
  178. std::mem::swap(&mut new_state, &mut *state);
  179. }
  180. fn process_command(
  181. self: &Arc<Self>,
  182. snapshot_holder: Arc<SnapshotHolder<KVServerState>>,
  183. command_channel: Receiver<ApplyCommandMessage<UniqueKVOp>>,
  184. ) {
  185. let this = Arc::downgrade(self);
  186. let logger = LocalLogger::inherit();
  187. let me = self.me();
  188. std::thread::spawn(move || {
  189. logger.attach();
  190. log::info!("KVServer {} waiting for commands ...", me);
  191. while let Ok(message) = command_channel.recv() {
  192. if let Some(this) = this.upgrade() {
  193. match message {
  194. ApplyCommandMessage::Snapshot(snapshot) => {
  195. let state = snapshot_holder.load_snapshot(snapshot);
  196. this.restore_state(state);
  197. }
  198. ApplyCommandMessage::Command(index, command) => {
  199. this.apply_op(
  200. command.unique_id,
  201. command.me,
  202. command.op,
  203. );
  204. if let Some(snapshot) = snapshot_holder
  205. .take_snapshot(&this.state.lock(), index)
  206. {
  207. this.rf.lock().save_snapshot(snapshot);
  208. }
  209. }
  210. }
  211. } else {
  212. break;
  213. }
  214. }
  215. log::info!("KVServer {} stopped waiting for commands.", me);
  216. });
  217. }
  218. const UNSEEN_TERM: usize = 0;
  219. const ATTEMPTING_TERM: usize = usize::MAX;
  220. fn block_for_commit(
  221. &self,
  222. unique_id: UniqueId,
  223. op: KVOp,
  224. timeout: Duration,
  225. ) -> Result<CommitResult, CommitError> {
  226. if !self.keep_running.load(Ordering::SeqCst) {
  227. return Err(CommitError::NotLeader);
  228. }
  229. let result_holder = {
  230. let mut state = self.state.lock();
  231. let applied = state.applied_op.get(&unique_id.clerk_id);
  232. if let Some((applied_unique_id, result)) = applied {
  233. #[allow(clippy::comparison_chain)]
  234. if unique_id < *applied_unique_id {
  235. return Err(CommitError::Expired(unique_id));
  236. } else if unique_id == *applied_unique_id {
  237. return Err(CommitError::Duplicate(result.clone()));
  238. }
  239. };
  240. let entry = state.queries.entry(unique_id).or_insert_with(|| {
  241. Arc::new(ResultHolder {
  242. term: AtomicUsize::new(Self::UNSEEN_TERM),
  243. result: Mutex::new(Err(CommitError::TimedOut)),
  244. condvar: Condvar::new(),
  245. })
  246. });
  247. entry.clone()
  248. };
  249. let (Term(hold_term), is_leader) = self.rf.lock().get_state();
  250. if !is_leader {
  251. result_holder.condvar.notify_all();
  252. return Err(CommitError::NotLeader);
  253. }
  254. Self::validate_term(hold_term);
  255. let set = result_holder.term.compare_exchange(
  256. Self::UNSEEN_TERM,
  257. Self::ATTEMPTING_TERM,
  258. Ordering::SeqCst,
  259. Ordering::SeqCst,
  260. );
  261. let start = match set {
  262. // Nobody has attempted start() yet.
  263. Ok(Self::UNSEEN_TERM) => true,
  264. Ok(_) => panic!(
  265. "compare_exchange should always return the current value 0"
  266. ),
  267. // Somebody is attempting start().
  268. Err(Self::ATTEMPTING_TERM) => false,
  269. // Somebody has attempted start().
  270. Err(prev_term) if prev_term < hold_term => {
  271. let set = result_holder.term.compare_exchange(
  272. prev_term,
  273. Self::ATTEMPTING_TERM,
  274. Ordering::SeqCst,
  275. Ordering::SeqCst,
  276. );
  277. set.is_ok()
  278. }
  279. _ => false,
  280. };
  281. if start {
  282. let op = UniqueKVOp {
  283. op,
  284. me: self.me(),
  285. unique_id,
  286. };
  287. let start = self.rf.lock().start(op);
  288. let start_term =
  289. start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
  290. Self::validate_term(term);
  291. term
  292. });
  293. let set = result_holder.term.compare_exchange(
  294. Self::ATTEMPTING_TERM,
  295. start_term,
  296. Ordering::SeqCst,
  297. Ordering::SeqCst,
  298. );
  299. // Setting term must have been successful, and must return the
  300. // value previously set by this attempt.
  301. assert_eq!(set, Ok(Self::ATTEMPTING_TERM));
  302. if start_term == Self::UNSEEN_TERM {
  303. result_holder.condvar.notify_all();
  304. return Err(CommitError::NotLeader);
  305. }
  306. }
  307. let mut guard = result_holder.result.lock();
  308. // Wait for the op to be committed.
  309. result_holder.condvar.wait_for(&mut guard, timeout);
  310. // Copy the result out.
  311. let result = guard.clone();
  312. // If the result is OK, all other requests should see "Duplicate".
  313. if let Ok(result) = guard.clone() {
  314. *guard = Err(CommitError::Duplicate(result))
  315. }
  316. result
  317. }
  318. fn validate_term(term: usize) {
  319. assert!(term > Self::UNSEEN_TERM, "Term must be larger than 0.");
  320. assert!(
  321. term < Self::ATTEMPTING_TERM,
  322. "Term must be smaller than usize::MAX."
  323. );
  324. }
  325. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  326. pub fn get(&self, args: GetArgs) -> GetReply {
  327. self.logger.clone().attach();
  328. let map_dup = match args.op {
  329. GetEnum::AllowDuplicate => |r| Ok(r),
  330. GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
  331. };
  332. let result = match self.block_for_commit(
  333. args.unique_id,
  334. KVOp::Get(args.key),
  335. Self::DEFAULT_TIMEOUT,
  336. ) {
  337. Ok(result) => Ok(result),
  338. Err(CommitError::Duplicate(result)) => map_dup(result),
  339. Err(CommitError::NotMe(result)) => map_dup(result),
  340. Err(e) => Err(e.into()),
  341. };
  342. let result = match result {
  343. Ok(result) => result,
  344. Err(e) => return GetReply { result: Err(e) },
  345. };
  346. let result = match result {
  347. CommitResult::Get(result) => Ok(result),
  348. CommitResult::Put => Err(KVError::Conflict),
  349. CommitResult::Append => Err(KVError::Conflict),
  350. };
  351. GetReply { result }
  352. }
  353. pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  354. self.logger.clone().attach();
  355. let op = match args.op {
  356. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  357. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  358. };
  359. let result = match self.block_for_commit(
  360. args.unique_id,
  361. op,
  362. Self::DEFAULT_TIMEOUT,
  363. ) {
  364. Ok(result) => result,
  365. Err(CommitError::Duplicate(result)) => result,
  366. Err(CommitError::NotMe(result)) => result,
  367. Err(e) => {
  368. return PutAppendReply {
  369. result: Err(e.into()),
  370. }
  371. }
  372. };
  373. let result = match result {
  374. CommitResult::Put => {
  375. if args.op == PutAppendEnum::Put {
  376. Ok(())
  377. } else {
  378. Err(KVError::Conflict)
  379. }
  380. }
  381. CommitResult::Append => {
  382. if args.op == PutAppendEnum::Append {
  383. Ok(())
  384. } else {
  385. Err(KVError::Conflict)
  386. }
  387. }
  388. CommitResult::Get(_) => Err(KVError::Conflict),
  389. };
  390. PutAppendReply { result }
  391. }
  392. pub fn me(&self) -> usize {
  393. self.me.load(Ordering::Relaxed)
  394. }
  395. pub fn raft(&self) -> Raft<UniqueKVOp> {
  396. self.rf.lock().clone()
  397. }
  398. pub fn kill(self: Arc<Self>) {
  399. // Return error to new queries.
  400. self.keep_running.store(false, Ordering::SeqCst);
  401. // Cancel all in-flight queries.
  402. for result_holder in self.state.lock().queries.values() {
  403. *result_holder.result.lock() = Err(CommitError::NotLeader);
  404. result_holder.condvar.notify_all();
  405. }
  406. let rf = self.raft();
  407. // We must drop self to remove the only clone of raft, so that
  408. // `rf.kill()` does not block.
  409. drop(self);
  410. rf.kill();
  411. // The process_command thread will exit, after Raft drops the reference
  412. // to the sender.
  413. }
  414. }