service_test.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #[macro_use]
  2. extern crate anyhow;
  3. extern crate kvraft;
  4. extern crate rand;
  5. #[macro_use]
  6. extern crate scopeguard;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::sync::Arc;
  9. use std::thread::JoinHandle;
  10. use std::time::Duration;
  11. use rand::{thread_rng, Rng};
  12. use anyhow::Context;
  13. use kvraft::testing_utils::config::{make_config, Config};
  14. use kvraft::Clerk;
  15. fn spawn_clients<T, Func>(
  16. config: Arc<Config>,
  17. clients: usize,
  18. func: Func,
  19. ) -> Vec<JoinHandle<T>>
  20. where
  21. T: 'static + Send,
  22. Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
  23. {
  24. let mut client_threads = vec![];
  25. for i in 0..clients {
  26. let clerk = config.make_clerk();
  27. let func = func.clone();
  28. client_threads.push(std::thread::spawn(move || func(i, clerk)))
  29. }
  30. eprintln!("spawning clients done.");
  31. client_threads
  32. }
  33. fn appending_client(
  34. index: usize,
  35. mut clerk: Clerk,
  36. stop: Arc<AtomicBool>,
  37. ) -> (usize, String) {
  38. eprintln!("client {} running.", index);
  39. let mut op_count = 0usize;
  40. let key = index.to_string();
  41. let mut last = String::new();
  42. let mut rng = thread_rng();
  43. clerk.put(&key, &last);
  44. while !stop.load(Ordering::Acquire) {
  45. eprintln!("client {} starting {}.", index, op_count);
  46. if rng.gen_ratio(1, 2) {
  47. let value = format!("({}, {}), ", index, op_count);
  48. last.push_str(&value);
  49. clerk.append(&key, &value);
  50. op_count += 1;
  51. } else {
  52. let value = clerk
  53. .get(&key)
  54. .expect(&format!("Key {} should exist.", index));
  55. assert_eq!(value, last);
  56. }
  57. eprintln!("client {} done {}.", index, op_count);
  58. }
  59. eprintln!("client {} done.", index);
  60. (op_count, last)
  61. }
  62. fn generic_test(clients: usize, unreliable: bool, maxraftstate: usize) {
  63. const SERVERS: usize = 5;
  64. let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
  65. // TODO(ditsing): add `defer!(cfg.clean_up());`
  66. cfg.begin("");
  67. let mut clerk = cfg.make_clerk();
  68. const ROUNDS: usize = 3;
  69. for _ in 0..ROUNDS {
  70. // Network partition thread.
  71. let partition_stop = Arc::new(AtomicBool::new(false));
  72. // KV server clients.
  73. let clients_stop = Arc::new(AtomicBool::new(false));
  74. let config = cfg.clone();
  75. let clients_stop_clone = clients_stop.clone();
  76. let spawn_client_results = std::thread::spawn(move || {
  77. spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
  78. appending_client(index, clerk, clients_stop_clone.clone())
  79. })
  80. });
  81. std::thread::sleep(Duration::from_secs(5));
  82. // Stop partitions.
  83. partition_stop.store(true, Ordering::Release);
  84. // Tell all clients to stop.
  85. clients_stop.store(true, Ordering::Release);
  86. let client_results = spawn_client_results
  87. .join()
  88. .expect("Spawning clients should never fail.");
  89. for (index, client_result) in client_results.into_iter().enumerate() {
  90. let (op_count, last_result) =
  91. client_result.join().expect("Client should never fail.");
  92. let real_result = clerk
  93. .get(index.to_string())
  94. .expect(&format!("Key {} should exist.", index));
  95. assert_eq!(real_result, last_result);
  96. assert!(
  97. op_count > 10,
  98. "Client committed only {} operations",
  99. op_count
  100. );
  101. }
  102. }
  103. cfg.end();
  104. }
  105. fn check_concurrent_results(
  106. value: String,
  107. clients: usize,
  108. expected: Vec<usize>,
  109. ) -> anyhow::Result<()> {
  110. if !value.starts_with('(') || !value.ends_with(')') {
  111. bail!("Malformed value string {}", value)
  112. }
  113. let inner_value = &value[1..value.len() - 1];
  114. let mut progress = vec![0; clients];
  115. for pair_str in inner_value.split(")(") {
  116. let mut nums = vec![];
  117. for num_str in pair_str.split(", ") {
  118. let num: usize = num_str.parse().context(format!(
  119. "Parsing '{:?}' failed within '{:?}'",
  120. num_str, value,
  121. ))?;
  122. nums.push(num);
  123. }
  124. if nums.len() != 2 {
  125. bail!(
  126. concat!(
  127. "More than two numbers in the same group when",
  128. " parsing '{:?}' failed within '{:?}'",
  129. ),
  130. pair_str,
  131. value,
  132. );
  133. }
  134. let (client, curr) = (nums[0], nums[1]);
  135. if progress[client] != curr {
  136. bail!(
  137. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  138. client,
  139. progress[client],
  140. curr,
  141. progress,
  142. value,
  143. )
  144. }
  145. progress[client] = curr + 1;
  146. }
  147. assert_eq!(progress, expected, "Expecting progress in {}", value);
  148. Ok(())
  149. }
  150. #[test]
  151. fn basic_service() {
  152. generic_test(1, false, 0);
  153. }
  154. #[test]
  155. fn concurrent_client() {
  156. generic_test(5, false, 0);
  157. }
  158. #[test]
  159. fn unreliable() {
  160. generic_test(5, true, 0);
  161. }
  162. #[test]
  163. fn unreliable_one_key() -> anyhow::Result<()> {
  164. const SERVERS: usize = 5;
  165. let cfg = Arc::new(make_config(SERVERS, true, 0));
  166. let mut clerk = cfg.make_clerk();
  167. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  168. clerk.put("k", "");
  169. const CLIENTS: usize = 5;
  170. const ATTEMPTS: usize = 10;
  171. let client_results = spawn_clients(cfg, CLIENTS, |index, mut clerk| {
  172. for i in 0..ATTEMPTS {
  173. clerk.append("k", format!("({}, {})", index, i));
  174. }
  175. });
  176. for client_result in client_results {
  177. client_result.join().expect("Client should never fail");
  178. }
  179. let value = clerk.get("k").expect("Key should exist");
  180. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  181. }