service_test.rs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. #[macro_use]
  2. extern crate anyhow;
  3. extern crate kvraft;
  4. extern crate rand;
  5. #[macro_use]
  6. extern crate scopeguard;
  7. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  8. use std::sync::Arc;
  9. use std::thread::JoinHandle;
  10. use std::time::Duration;
  11. use anyhow::Context;
  12. use rand::{thread_rng, Rng};
  13. use kvraft::testing_utils::config::{
  14. make_config, sleep_election_timeouts, sleep_millis, Config,
  15. };
  16. use kvraft::Clerk;
  17. fn spawn_clients<T, Func>(
  18. config: Arc<Config>,
  19. clients: usize,
  20. func: Func,
  21. ) -> Vec<JoinHandle<T>>
  22. where
  23. T: 'static + Send,
  24. Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
  25. {
  26. let mut client_threads = vec![];
  27. for i in 0..clients {
  28. let clerk = config.make_clerk();
  29. let func = func.clone();
  30. client_threads.push(std::thread::spawn(move || func(i, clerk)))
  31. }
  32. eprintln!("spawning clients done.");
  33. client_threads
  34. }
  35. fn appending_client(
  36. index: usize,
  37. mut clerk: Clerk,
  38. stop: Arc<AtomicBool>,
  39. ) -> (usize, String) {
  40. eprintln!("client {} running.", index);
  41. let mut op_count = 0usize;
  42. let key = index.to_string();
  43. let mut last = String::new();
  44. let mut rng = thread_rng();
  45. clerk.put(&key, &last);
  46. while !stop.load(Ordering::Acquire) {
  47. eprintln!("client {} starting {}.", index, op_count);
  48. if rng.gen_ratio(1, 2) {
  49. let value = format!("({}, {}), ", index, op_count);
  50. last.push_str(&value);
  51. clerk.append(&key, &value);
  52. op_count += 1;
  53. } else {
  54. let value = clerk
  55. .get(&key)
  56. .expect(&format!("Key {} should exist.", index));
  57. assert_eq!(value, last);
  58. }
  59. eprintln!("client {} done {}.", index, op_count);
  60. }
  61. eprintln!("client {} done.", index);
  62. (op_count, last)
  63. }
  64. fn generic_test(clients: usize, unreliable: bool, maxraftstate: usize) {
  65. const SERVERS: usize = 5;
  66. let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
  67. // TODO(ditsing): add `defer!(cfg.clean_up());`
  68. cfg.begin("");
  69. let mut clerk = cfg.make_clerk();
  70. const ROUNDS: usize = 3;
  71. for _ in 0..ROUNDS {
  72. // Network partition thread.
  73. let partition_stop = Arc::new(AtomicBool::new(false));
  74. // KV server clients.
  75. let clients_stop = Arc::new(AtomicBool::new(false));
  76. let config = cfg.clone();
  77. let clients_stop_clone = clients_stop.clone();
  78. let spawn_client_results = std::thread::spawn(move || {
  79. spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
  80. appending_client(index, clerk, clients_stop_clone.clone())
  81. })
  82. });
  83. std::thread::sleep(Duration::from_secs(5));
  84. // Stop partitions.
  85. partition_stop.store(true, Ordering::Release);
  86. // Tell all clients to stop.
  87. clients_stop.store(true, Ordering::Release);
  88. let client_results = spawn_client_results
  89. .join()
  90. .expect("Spawning clients should never fail.");
  91. for (index, client_result) in client_results.into_iter().enumerate() {
  92. let (op_count, last_result) =
  93. client_result.join().expect("Client should never fail.");
  94. let real_result = clerk
  95. .get(index.to_string())
  96. .expect(&format!("Key {} should exist.", index));
  97. assert_eq!(real_result, last_result);
  98. assert!(
  99. op_count > 10,
  100. "Client committed only {} operations",
  101. op_count
  102. );
  103. }
  104. }
  105. cfg.end();
  106. }
  107. fn check_concurrent_results(
  108. value: String,
  109. clients: usize,
  110. expected: Vec<usize>,
  111. ) -> anyhow::Result<()> {
  112. if !value.starts_with('(') || !value.ends_with(')') {
  113. bail!("Malformed value string {}", value)
  114. }
  115. let inner_value = &value[1..value.len() - 1];
  116. let mut progress = vec![0; clients];
  117. for pair_str in inner_value.split(")(") {
  118. let mut nums = vec![];
  119. for num_str in pair_str.split(", ") {
  120. let num: usize = num_str.parse().context(format!(
  121. "Parsing '{:?}' failed within '{:?}'",
  122. num_str, value,
  123. ))?;
  124. nums.push(num);
  125. }
  126. if nums.len() != 2 {
  127. bail!(
  128. concat!(
  129. "More than two numbers in the same group when",
  130. " parsing '{:?}' failed within '{:?}'",
  131. ),
  132. pair_str,
  133. value,
  134. );
  135. }
  136. let (client, curr) = (nums[0], nums[1]);
  137. if progress[client] != curr {
  138. bail!(
  139. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  140. client,
  141. progress[client],
  142. curr,
  143. progress,
  144. value,
  145. )
  146. }
  147. progress[client] = curr + 1;
  148. }
  149. assert_eq!(progress, expected, "Expecting progress in {}", value);
  150. Ok(())
  151. }
  152. #[test]
  153. fn basic_service() {
  154. generic_test(1, false, 0);
  155. }
  156. #[test]
  157. fn concurrent_client() {
  158. generic_test(5, false, 0);
  159. }
  160. #[test]
  161. fn unreliable() {
  162. generic_test(5, true, 0);
  163. }
  164. #[test]
  165. fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
  166. const SERVERS: usize = 5;
  167. let cfg = Arc::new(make_config(SERVERS, true, 0));
  168. let mut clerk = cfg.make_clerk();
  169. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  170. clerk.put("k", "");
  171. const CLIENTS: usize = 5;
  172. const ATTEMPTS: usize = 10;
  173. let client_results = spawn_clients(cfg, CLIENTS, |index, mut clerk| {
  174. for i in 0..ATTEMPTS {
  175. clerk.append("k", format!("({}, {})", index, i));
  176. }
  177. });
  178. for client_result in client_results {
  179. client_result.join().expect("Client should never fail");
  180. }
  181. let value = clerk.get("k").expect("Key should exist");
  182. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  183. }
  184. #[test]
  185. fn one_partition() -> anyhow::Result<()> {
  186. const SERVERS: usize = 5;
  187. let cfg = Arc::new(make_config(SERVERS, false, 0));
  188. cfg.begin("Test: progress in majority (3A)");
  189. const KEY: &str = "1";
  190. let mut clerk = cfg.make_clerk();
  191. clerk.put(KEY, "13");
  192. let (majority, minority) = cfg.make_partition();
  193. assert!(minority.len() < majority.len());
  194. assert_eq!(minority.len() + majority.len(), SERVERS);
  195. cfg.partition(&majority, &minority);
  196. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  197. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  198. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  199. clerk_majority.put(KEY, "14");
  200. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  201. cfg.begin("Test: no progress in minority (3A)");
  202. let counter = Arc::new(AtomicUsize::new(0));
  203. let counter1 = counter.clone();
  204. std::thread::spawn(move || {
  205. clerk_minority1.put(KEY, "15");
  206. counter1.fetch_or(1, Ordering::SeqCst);
  207. });
  208. let counter2 = counter.clone();
  209. std::thread::spawn(move || {
  210. clerk_minority2.get(KEY);
  211. counter2.fetch_or(2, Ordering::SeqCst);
  212. });
  213. sleep_millis(1000);
  214. assert_eq!(counter.load(Ordering::SeqCst), 0);
  215. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  216. clerk_majority.put(KEY, "16");
  217. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  218. cfg.begin("Test: completion after heal (3A)");
  219. cfg.connect_all();
  220. cfg.connect_all_clerks();
  221. sleep_election_timeouts(1);
  222. for _ in 0..100 {
  223. sleep_millis(60);
  224. if counter.load(Ordering::SeqCst) == 3 {
  225. break;
  226. }
  227. }
  228. assert_eq!(counter.load(Ordering::SeqCst), 3);
  229. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  230. Ok(())
  231. }