server.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. use std::collections::hash_map::Entry;
  2. use std::collections::HashMap;
  3. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  4. use std::sync::mpsc::{channel, Receiver};
  5. use std::sync::Arc;
  6. use std::time::Duration;
  7. use futures::FutureExt;
  8. use parking_lot::Mutex;
  9. use serde_derive::{Deserialize, Serialize};
  10. use ruaft::{ApplyCommandMessage, Persister, Raft, RemoteRaft, Term};
  11. use test_utils::log_with;
  12. use test_utils::thread_local_logger::LocalLogger;
  13. use crate::common::{
  14. ClerkId, GetArgs, GetEnum, GetReply, KVError, PutAppendArgs, PutAppendEnum,
  15. PutAppendReply, UniqueId,
  16. };
  17. use crate::snapshot_holder::SnapshotHolder;
  18. pub struct KVServer {
  19. me: usize,
  20. state: Mutex<KVServerState>,
  21. rf: Raft<UniqueKVOp>,
  22. keep_running: AtomicBool,
  23. logger: LocalLogger,
  24. }
  25. #[derive(Clone, Default, Serialize, Deserialize)]
  26. pub struct UniqueKVOp {
  27. op: KVOp,
  28. me: usize,
  29. unique_id: UniqueId,
  30. }
  31. #[derive(Default, Serialize, Deserialize)]
  32. struct KVServerState {
  33. kv: HashMap<String, String>,
  34. debug_kv: HashMap<String, String>,
  35. applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
  36. #[serde(skip)]
  37. queries: HashMap<
  38. UniqueId,
  39. (
  40. Arc<ResultHolder>,
  41. futures::channel::oneshot::Sender<
  42. Result<CommitResult, CommitError>,
  43. >,
  44. ),
  45. >,
  46. }
  47. #[derive(Clone, Serialize, Deserialize)]
  48. enum KVOp {
  49. NoOp,
  50. Get(String),
  51. Put(String, String),
  52. Append(String, String),
  53. }
  54. impl Default for KVOp {
  55. fn default() -> Self {
  56. KVOp::NoOp
  57. }
  58. }
  59. struct ResultHolder {
  60. term: AtomicUsize,
  61. peeks: AtomicUsize,
  62. result: futures::future::Shared<
  63. futures::channel::oneshot::Receiver<Result<CommitResult, CommitError>>,
  64. >,
  65. }
  66. #[derive(Clone, Debug, Serialize, Deserialize)]
  67. enum CommitResult {
  68. Get(Option<String>),
  69. Put,
  70. Append,
  71. }
  72. #[derive(Clone, Debug)]
  73. enum CommitError {
  74. NotLeader,
  75. Expired(UniqueId),
  76. TimedOut,
  77. #[allow(dead_code)]
  78. Conflict,
  79. NotMe(CommitResult),
  80. Duplicate(CommitResult),
  81. }
  82. impl From<CommitError> for KVError {
  83. fn from(err: CommitError) -> Self {
  84. match err {
  85. CommitError::NotLeader => KVError::NotLeader,
  86. CommitError::Expired(_) => KVError::Expired,
  87. CommitError::TimedOut => KVError::TimedOut,
  88. CommitError::Conflict => KVError::Conflict,
  89. CommitError::NotMe(_) => panic!("NotMe is not a KVError"),
  90. CommitError::Duplicate(_) => panic!("Duplicate is not a KVError"),
  91. }
  92. }
  93. }
  94. impl KVServer {
  95. pub fn new(
  96. servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
  97. me: usize,
  98. persister: Arc<dyn Persister>,
  99. max_state_size_bytes: Option<usize>,
  100. ) -> Arc<Self> {
  101. let (tx, rx) = channel();
  102. let apply_command = move |message| {
  103. // Ignore apply errors.
  104. let _ = tx.send(message);
  105. };
  106. let snapshot_holder = Arc::new(SnapshotHolder::default());
  107. let snapshot_holder_clone = snapshot_holder.clone();
  108. let ret = Arc::new(Self {
  109. me,
  110. state: Default::default(),
  111. rf: Raft::new(
  112. servers,
  113. me,
  114. persister,
  115. apply_command,
  116. max_state_size_bytes,
  117. move |index| snapshot_holder_clone.request_snapshot(index),
  118. ),
  119. keep_running: AtomicBool::new(true),
  120. logger: LocalLogger::inherit(),
  121. });
  122. ret.process_command(snapshot_holder, rx);
  123. ret
  124. }
  125. fn apply_op(&self, unique_id: UniqueId, leader: usize, op: KVOp) {
  126. // The borrow checker does not allow borrowing two fields of an instance
  127. // inside a MutexGuard. But it does allow borrowing two fields of the
  128. // instance itself. Calling deref_mut() on the MutexGuard works, too!
  129. let state = &mut *self.state.lock();
  130. let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
  131. let entry = applied_op.entry(unique_id.clerk_id);
  132. if let Entry::Occupied(curr) = &entry {
  133. let (applied_unique_id, _) = curr.get();
  134. if *applied_unique_id >= unique_id {
  135. // Redelivered.
  136. // It is guaranteed that we have no pending queries with the
  137. // same unique_id, because
  138. // 1. When inserting into queries, we first check the unique_id
  139. // is strictly larger than the one in applied_op.
  140. // 2. When modifying entries in applied_op, the unique_id can
  141. // only grow larger. And we make sure there is no entries with
  142. // the same unique_id in queries.
  143. // TODO(ditsing): in case 2), make sure there is no entries in
  144. // queries that have a smaller unique_id.
  145. assert!(!state.queries.contains_key(&unique_id));
  146. return;
  147. }
  148. }
  149. let result = match op {
  150. KVOp::NoOp => return,
  151. KVOp::Get(key) => CommitResult::Get(kv.get(&key).cloned()),
  152. KVOp::Put(key, value) => {
  153. kv.insert(key, value);
  154. CommitResult::Put
  155. }
  156. KVOp::Append(key, value) => {
  157. kv.entry(key)
  158. .and_modify(|str| str.push_str(&value))
  159. .or_insert(value);
  160. CommitResult::Append
  161. }
  162. };
  163. match entry {
  164. Entry::Occupied(mut curr) => {
  165. curr.insert((unique_id, result.clone()));
  166. }
  167. Entry::Vacant(vacant) => {
  168. vacant.insert((unique_id, result.clone()));
  169. }
  170. }
  171. if let Some((_, sender)) = state.queries.remove(&unique_id) {
  172. // This KV server might not be the same leader that committed the
  173. // query. We are not sure if it is a duplicate or a conflict. To
  174. // tell the difference, terms of all queries must be stored.
  175. let _ = sender.send(if leader == self.me {
  176. Ok(result)
  177. } else {
  178. Err(CommitError::NotMe(result))
  179. });
  180. };
  181. }
  182. fn restore_state(&self, mut new_state: KVServerState) {
  183. let mut state = self.state.lock();
  184. // Cleanup all existing queries.
  185. for (_, (_, sender)) in state.queries.drain() {
  186. let _ = sender.send(Err(CommitError::NotLeader));
  187. }
  188. std::mem::swap(&mut new_state, &mut *state);
  189. }
  190. fn process_command(
  191. self: &Arc<Self>,
  192. snapshot_holder: Arc<SnapshotHolder<KVServerState>>,
  193. command_channel: Receiver<ApplyCommandMessage<UniqueKVOp>>,
  194. ) {
  195. let this = Arc::downgrade(self);
  196. let logger = LocalLogger::inherit();
  197. let me = self.me;
  198. std::thread::spawn(move || {
  199. logger.attach();
  200. log::info!("KVServer {} waiting for commands ...", me);
  201. while let Ok(message) = command_channel.recv() {
  202. if let Some(this) = this.upgrade() {
  203. match message {
  204. ApplyCommandMessage::Snapshot(snapshot) => {
  205. let state = snapshot_holder.load_snapshot(snapshot);
  206. this.restore_state(state);
  207. }
  208. ApplyCommandMessage::Command(index, command) => {
  209. this.apply_op(
  210. command.unique_id,
  211. command.me,
  212. command.op,
  213. );
  214. if let Some(snapshot) = snapshot_holder
  215. .take_snapshot(&this.state.lock(), index)
  216. {
  217. this.rf.save_snapshot(snapshot);
  218. }
  219. }
  220. }
  221. } else {
  222. break;
  223. }
  224. }
  225. log::info!("KVServer {} stopped waiting for commands.", me);
  226. });
  227. }
  228. const UNSEEN_TERM: usize = 0;
  229. const ATTEMPTING_TERM: usize = usize::MAX;
  230. async fn block_for_commit(
  231. &self,
  232. unique_id: UniqueId,
  233. op: KVOp,
  234. timeout: Duration,
  235. ) -> Result<CommitResult, CommitError> {
  236. if !self.keep_running.load(Ordering::SeqCst) {
  237. return Err(CommitError::NotLeader);
  238. }
  239. let result_holder = {
  240. let mut state = self.state.lock();
  241. let applied = state.applied_op.get(&unique_id.clerk_id);
  242. if let Some((applied_unique_id, result)) = applied {
  243. #[allow(clippy::comparison_chain)]
  244. if unique_id < *applied_unique_id {
  245. return Err(CommitError::Expired(unique_id));
  246. } else if unique_id == *applied_unique_id {
  247. return Err(CommitError::Duplicate(result.clone()));
  248. }
  249. };
  250. let entry = state.queries.entry(unique_id).or_insert_with(|| {
  251. let (tx, rx) = futures::channel::oneshot::channel();
  252. (
  253. Arc::new(ResultHolder {
  254. term: AtomicUsize::new(Self::UNSEEN_TERM),
  255. peeks: AtomicUsize::new(0),
  256. result: rx.shared(),
  257. }),
  258. tx,
  259. )
  260. });
  261. entry.0.clone()
  262. };
  263. let (Term(hold_term), is_leader) = self.rf.get_state();
  264. if !is_leader {
  265. return Err(CommitError::NotLeader);
  266. }
  267. Self::validate_term(hold_term);
  268. let set = result_holder.term.compare_exchange(
  269. Self::UNSEEN_TERM,
  270. Self::ATTEMPTING_TERM,
  271. Ordering::SeqCst,
  272. Ordering::SeqCst,
  273. );
  274. let start = match set {
  275. // Nobody has attempted start() yet.
  276. Ok(Self::UNSEEN_TERM) => true,
  277. Ok(_) => panic!(
  278. "compare_exchange should always return the current value 0"
  279. ),
  280. // Somebody is attempting start().
  281. Err(Self::ATTEMPTING_TERM) => false,
  282. // Somebody has attempted start().
  283. Err(prev_term) if prev_term < hold_term => {
  284. let set = result_holder.term.compare_exchange(
  285. prev_term,
  286. Self::ATTEMPTING_TERM,
  287. Ordering::SeqCst,
  288. Ordering::SeqCst,
  289. );
  290. set.is_ok()
  291. }
  292. _ => false,
  293. };
  294. if start {
  295. let op = UniqueKVOp {
  296. op,
  297. me: self.me,
  298. unique_id,
  299. };
  300. let start = log_with!(self.logger, self.rf.start(op));
  301. let start_term =
  302. start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
  303. Self::validate_term(term);
  304. term
  305. });
  306. let set = result_holder.term.compare_exchange(
  307. Self::ATTEMPTING_TERM,
  308. start_term,
  309. Ordering::SeqCst,
  310. Ordering::SeqCst,
  311. );
  312. // Setting term must have been successful, and must return the
  313. // value previously set by this attempt.
  314. assert_eq!(set, Ok(Self::ATTEMPTING_TERM));
  315. if start_term == Self::UNSEEN_TERM {
  316. return Err(CommitError::NotLeader);
  317. }
  318. }
  319. let result = result_holder.result.clone();
  320. // Wait for the op to be committed.
  321. let result = tokio::time::timeout(timeout, result).await;
  322. match result {
  323. Ok(Ok(Ok(result))) => {
  324. // If the result is OK, all other requests should see "Duplicate".
  325. if result_holder.peeks.fetch_add(1, Ordering::Relaxed) == 0 {
  326. Ok(result)
  327. } else {
  328. Err(CommitError::Duplicate(result))
  329. }
  330. }
  331. Ok(Ok(Err(e))) => Err(e),
  332. Ok(Err(_)) => Err(CommitError::NotLeader),
  333. Err(_) => Err(CommitError::TimedOut),
  334. }
  335. }
  336. fn validate_term(term: usize) {
  337. assert!(term > Self::UNSEEN_TERM, "Term must be larger than 0.");
  338. assert!(
  339. term < Self::ATTEMPTING_TERM,
  340. "Term must be smaller than usize::MAX."
  341. );
  342. }
  343. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
  344. pub async fn get(&self, args: GetArgs) -> GetReply {
  345. let map_dup = match args.op {
  346. GetEnum::AllowDuplicate => |r| Ok(r),
  347. GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
  348. };
  349. let result_fut = self.block_for_commit(
  350. args.unique_id,
  351. KVOp::Get(args.key),
  352. Self::DEFAULT_TIMEOUT,
  353. );
  354. let result = match result_fut.await {
  355. Ok(result) => Ok(result),
  356. Err(CommitError::Duplicate(result)) => map_dup(result),
  357. Err(CommitError::NotMe(result)) => map_dup(result),
  358. Err(e) => Err(e.into()),
  359. };
  360. let result = match result {
  361. Ok(result) => result,
  362. Err(e) => return GetReply { result: Err(e) },
  363. };
  364. let result = match result {
  365. CommitResult::Get(result) => Ok(result),
  366. CommitResult::Put => Err(KVError::Conflict),
  367. CommitResult::Append => Err(KVError::Conflict),
  368. };
  369. GetReply { result }
  370. }
  371. pub async fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
  372. let op = match args.op {
  373. PutAppendEnum::Put => KVOp::Put(args.key, args.value),
  374. PutAppendEnum::Append => KVOp::Append(args.key, args.value),
  375. };
  376. let result_fut =
  377. self.block_for_commit(args.unique_id, op, Self::DEFAULT_TIMEOUT);
  378. let result = match result_fut.await {
  379. Ok(result) => result,
  380. Err(CommitError::Duplicate(result)) => result,
  381. Err(CommitError::NotMe(result)) => result,
  382. Err(e) => {
  383. return PutAppendReply {
  384. result: Err(e.into()),
  385. }
  386. }
  387. };
  388. let result = match result {
  389. CommitResult::Put => {
  390. if args.op == PutAppendEnum::Put {
  391. Ok(())
  392. } else {
  393. Err(KVError::Conflict)
  394. }
  395. }
  396. CommitResult::Append => {
  397. if args.op == PutAppendEnum::Append {
  398. Ok(())
  399. } else {
  400. Err(KVError::Conflict)
  401. }
  402. }
  403. CommitResult::Get(_) => Err(KVError::Conflict),
  404. };
  405. PutAppendReply { result }
  406. }
  407. pub fn raft(&self) -> &Raft<UniqueKVOp> {
  408. &self.rf
  409. }
  410. pub fn kill(self: Arc<Self>) {
  411. // Return error to new queries.
  412. self.keep_running.store(false, Ordering::SeqCst);
  413. // Cancel all in-flight queries.
  414. for (_, (_, sender)) in self.state.lock().queries.drain() {
  415. let _ = sender.send(Err(CommitError::NotLeader));
  416. }
  417. let rf = self.raft().clone();
  418. // We must drop self to remove the only clone of raft, so that
  419. // `rf.kill()` does not block.
  420. drop(self);
  421. rf.kill();
  422. // The process_command thread will exit, after Raft drops the reference
  423. // to the sender.
  424. }
  425. }