service_test.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. #[macro_use]
  2. extern crate anyhow;
  3. extern crate kvraft;
  4. extern crate rand;
  5. #[macro_use]
  6. extern crate scopeguard;
  7. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  8. use std::sync::Arc;
  9. use std::thread::JoinHandle;
  10. use std::time::Duration;
  11. use anyhow::Context;
  12. use rand::{thread_rng, Rng};
  13. use kvraft::testing_utils::config::{
  14. make_config, sleep_election_timeouts, sleep_millis, Config,
  15. LONG_ELECTION_TIMEOUT_MILLIS,
  16. };
  17. use kvraft::Clerk;
  18. fn spawn_clients<T, Func>(
  19. config: Arc<Config>,
  20. clients: usize,
  21. func: Func,
  22. ) -> Vec<JoinHandle<T>>
  23. where
  24. T: 'static + Send,
  25. Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
  26. {
  27. let mut client_threads = vec![];
  28. for i in 0..clients {
  29. let clerk = config.make_clerk();
  30. let func = func.clone();
  31. client_threads.push(std::thread::spawn(move || func(i, clerk)))
  32. }
  33. eprintln!("spawning clients done.");
  34. client_threads
  35. }
  36. fn appending_client(
  37. index: usize,
  38. mut clerk: Clerk,
  39. stop: Arc<AtomicBool>,
  40. ) -> (usize, String) {
  41. eprintln!("client {} running.", index);
  42. let mut op_count = 0usize;
  43. let key = index.to_string();
  44. let mut last = String::new();
  45. let mut rng = thread_rng();
  46. clerk.put(&key, &last);
  47. while !stop.load(Ordering::Acquire) {
  48. eprintln!("client {} starting {}.", index, op_count);
  49. if rng.gen_ratio(1, 2) {
  50. let value = format!("({}, {}), ", index, op_count);
  51. last.push_str(&value);
  52. clerk.append(&key, &value);
  53. op_count += 1;
  54. } else {
  55. let value = clerk
  56. .get(&key)
  57. .expect(&format!("Key {} should exist.", index));
  58. assert_eq!(value, last);
  59. }
  60. eprintln!("client {} done {}.", index, op_count);
  61. }
  62. eprintln!("client {} done.", index);
  63. (op_count, last)
  64. }
  65. const PARTITION_MAX_DELAY_MILLIS: u64 = 200;
  66. fn run_partition(cfg: Arc<Config>, stop: Arc<AtomicBool>) {
  67. while !stop.load(Ordering::Acquire) {
  68. let mut indexes = cfg.shuffled_indexes();
  69. let len = indexes.len();
  70. cfg.partition(&(indexes.split_off(len / 2)), &indexes);
  71. let delay = thread_rng().gen_range(
  72. LONG_ELECTION_TIMEOUT_MILLIS
  73. ..LONG_ELECTION_TIMEOUT_MILLIS + PARTITION_MAX_DELAY_MILLIS,
  74. );
  75. std::thread::sleep(Duration::from_millis(delay));
  76. }
  77. }
  78. #[derive(Default)]
  79. struct GenericTestParams {
  80. clients: usize,
  81. unreliable: bool,
  82. partition: bool,
  83. crash: bool,
  84. maxraftstate: Option<usize>,
  85. }
  86. fn generic_test(test_params: GenericTestParams) {
  87. let GenericTestParams {
  88. clients,
  89. unreliable,
  90. partition,
  91. crash,
  92. maxraftstate,
  93. } = test_params;
  94. let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
  95. const SERVERS: usize = 5;
  96. let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
  97. defer!(cfg.clean_up());
  98. cfg.begin("");
  99. let mut clerk = cfg.make_clerk();
  100. const ROUNDS: usize = 3;
  101. for _ in 0..ROUNDS {
  102. // Network partition thread.
  103. let partition_stop = Arc::new(AtomicBool::new(false));
  104. // KV server clients.
  105. let clients_stop = Arc::new(AtomicBool::new(false));
  106. let config = cfg.clone();
  107. let clients_stop_clone = clients_stop.clone();
  108. let spawn_client_results = std::thread::spawn(move || {
  109. spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
  110. appending_client(index, clerk, clients_stop_clone.clone())
  111. })
  112. });
  113. let partition_result = if partition {
  114. // Let the clients perform some operations without interruption.
  115. sleep_millis(1000);
  116. let config = cfg.clone();
  117. let partition_stop_clone = partition_stop.clone();
  118. Some(std::thread::spawn(|| {
  119. run_partition(config, partition_stop_clone)
  120. }))
  121. } else {
  122. None
  123. };
  124. if crash {
  125. cfg.crash_all();
  126. sleep_election_timeouts(1);
  127. cfg.restart_all();
  128. }
  129. std::thread::sleep(Duration::from_secs(5));
  130. // Stop partitions.
  131. partition_stop.store(true, Ordering::Release);
  132. partition_result.map(|result| {
  133. result.join().expect("Partition thread should never fail");
  134. cfg.connect_all();
  135. sleep_election_timeouts(1);
  136. });
  137. // Tell all clients to stop.
  138. clients_stop.store(true, Ordering::Release);
  139. let client_results = spawn_client_results
  140. .join()
  141. .expect("Spawning clients should never fail.");
  142. for (index, client_result) in client_results.into_iter().enumerate() {
  143. let (op_count, last_result) =
  144. client_result.join().expect("Client should never fail.");
  145. let real_result = clerk
  146. .get(index.to_string())
  147. .expect(&format!("Key {} should exist.", index));
  148. assert_eq!(real_result, last_result);
  149. eprintln!("Client {} committed {} operations", index, op_count);
  150. assert!(op_count > 10, "Client committed less than 10 operations");
  151. }
  152. }
  153. cfg.end();
  154. }
  155. fn check_concurrent_results(
  156. value: String,
  157. clients: usize,
  158. expected: Vec<usize>,
  159. ) -> anyhow::Result<()> {
  160. if !value.starts_with('(') || !value.ends_with(')') {
  161. bail!("Malformed value string {}", value)
  162. }
  163. let inner_value = &value[1..value.len() - 1];
  164. let mut progress = vec![0; clients];
  165. for pair_str in inner_value.split(")(") {
  166. let mut nums = vec![];
  167. for num_str in pair_str.split(", ") {
  168. let num: usize = num_str.parse().context(format!(
  169. "Parsing '{:?}' failed within '{:?}'",
  170. num_str, value,
  171. ))?;
  172. nums.push(num);
  173. }
  174. if nums.len() != 2 {
  175. bail!(
  176. concat!(
  177. "More than two numbers in the same group when",
  178. " parsing '{:?}' failed within '{:?}'",
  179. ),
  180. pair_str,
  181. value,
  182. );
  183. }
  184. let (client, curr) = (nums[0], nums[1]);
  185. if progress[client] != curr {
  186. bail!(
  187. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  188. client,
  189. progress[client],
  190. curr,
  191. progress,
  192. value,
  193. )
  194. }
  195. progress[client] = curr + 1;
  196. }
  197. assert_eq!(progress, expected, "Expecting progress in {}", value);
  198. Ok(())
  199. }
  200. #[test]
  201. fn basic_service() {
  202. generic_test(GenericTestParams {
  203. clients: 1,
  204. ..Default::default()
  205. });
  206. }
  207. #[test]
  208. fn concurrent_client() {
  209. generic_test(GenericTestParams {
  210. clients: 5,
  211. ..Default::default()
  212. });
  213. }
  214. #[test]
  215. fn unreliable_many_clients() {
  216. generic_test(GenericTestParams {
  217. clients: 5,
  218. unreliable: true,
  219. ..Default::default()
  220. });
  221. }
  222. #[test]
  223. fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
  224. const SERVERS: usize = 5;
  225. let cfg = Arc::new(make_config(SERVERS, true, 0));
  226. defer!(cfg.clean_up());
  227. let mut clerk = cfg.make_clerk();
  228. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  229. clerk.put("k", "");
  230. const CLIENTS: usize = 5;
  231. const ATTEMPTS: usize = 10;
  232. let client_results =
  233. spawn_clients(cfg.clone(), CLIENTS, |index, mut clerk| {
  234. for i in 0..ATTEMPTS {
  235. clerk.append("k", format!("({}, {})", index, i));
  236. }
  237. });
  238. for client_result in client_results {
  239. client_result.join().expect("Client should never fail");
  240. }
  241. let value = clerk.get("k").expect("Key should exist");
  242. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  243. }
  244. #[test]
  245. fn one_partition() -> anyhow::Result<()> {
  246. const SERVERS: usize = 5;
  247. let cfg = Arc::new(make_config(SERVERS, false, 0));
  248. defer!(cfg.clean_up());
  249. cfg.begin("Test: progress in majority (3A)");
  250. const KEY: &str = "1";
  251. let mut clerk = cfg.make_clerk();
  252. clerk.put(KEY, "13");
  253. let (majority, minority) = cfg.make_partition();
  254. assert!(minority.len() < majority.len());
  255. assert_eq!(minority.len() + majority.len(), SERVERS);
  256. cfg.partition(&majority, &minority);
  257. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  258. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  259. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  260. clerk_majority.put(KEY, "14");
  261. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  262. cfg.begin("Test: no progress in minority (3A)");
  263. let counter = Arc::new(AtomicUsize::new(0));
  264. let counter1 = counter.clone();
  265. std::thread::spawn(move || {
  266. clerk_minority1.put(KEY, "15");
  267. counter1.fetch_or(1, Ordering::SeqCst);
  268. });
  269. let counter2 = counter.clone();
  270. std::thread::spawn(move || {
  271. clerk_minority2.get(KEY);
  272. counter2.fetch_or(2, Ordering::SeqCst);
  273. });
  274. sleep_millis(1000);
  275. assert_eq!(counter.load(Ordering::SeqCst), 0);
  276. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  277. clerk_majority.put(KEY, "16");
  278. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  279. cfg.begin("Test: completion after heal (3A)");
  280. cfg.connect_all();
  281. cfg.connect_all_clerks();
  282. sleep_election_timeouts(1);
  283. for _ in 0..100 {
  284. sleep_millis(60);
  285. if counter.load(Ordering::SeqCst) == 3 {
  286. break;
  287. }
  288. }
  289. assert_eq!(counter.load(Ordering::SeqCst), 3);
  290. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  291. Ok(())
  292. }
  293. #[test]
  294. fn many_partitions_one_client() {
  295. generic_test(GenericTestParams {
  296. clients: 1,
  297. partition: true,
  298. ..Default::default()
  299. });
  300. }
  301. #[test]
  302. fn many_partitions_many_client() {
  303. generic_test(GenericTestParams {
  304. clients: 5,
  305. partition: true,
  306. ..Default::default()
  307. });
  308. }
  309. #[test]
  310. fn persist_one_client() {
  311. generic_test(GenericTestParams {
  312. clients: 1,
  313. crash: true,
  314. ..Default::default()
  315. });
  316. }
  317. #[test]
  318. fn persist_concurrent() {
  319. generic_test(GenericTestParams {
  320. clients: 5,
  321. crash: true,
  322. ..Default::default()
  323. });
  324. }
  325. #[test]
  326. fn persist_concurrent_unreliable() {
  327. generic_test(GenericTestParams {
  328. clients: 5,
  329. unreliable: true,
  330. crash: true,
  331. ..Default::default()
  332. });
  333. }
  334. #[test]
  335. fn persist_partition() {
  336. generic_test(GenericTestParams {
  337. clients: 5,
  338. partition: true,
  339. crash: true,
  340. ..Default::default()
  341. });
  342. }
  343. #[test]
  344. fn persist_partition_unreliable() {
  345. generic_test(GenericTestParams {
  346. clients: 5,
  347. unreliable: true,
  348. partition: true,
  349. crash: true,
  350. ..Default::default()
  351. });
  352. }