service_test.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use scopeguard::defer;
  4. use test_configs::kvraft::config::make_config;
  5. use test_configs::kvraft::generic_test::{
  6. generic_test, spawn_clients, GenericTestParams,
  7. };
  8. use test_configs::utils::{sleep_election_timeouts, sleep_millis};
  9. use test_utils::init_test_log;
  10. use test_utils::thread_local_logger::LocalLogger;
  11. type Result = std::result::Result<(), String>;
  12. fn check_concurrent_results(
  13. value: String,
  14. clients: usize,
  15. expected: Vec<usize>,
  16. ) -> Result {
  17. if !value.starts_with('(') || !value.ends_with(')') {
  18. return Err(format!("Malformed value string {}", value));
  19. }
  20. let inner_value = &value[1..value.len() - 1];
  21. let mut progress = vec![0; clients];
  22. for pair_str in inner_value.split(")(") {
  23. let mut nums = vec![];
  24. for num_str in pair_str.split(", ") {
  25. let num: usize = num_str.parse().map_err(|_e| {
  26. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  27. })?;
  28. nums.push(num);
  29. }
  30. if nums.len() != 2 {
  31. return Err(format!(
  32. concat!(
  33. "More than two numbers in the same group when",
  34. " parsing '{:?}' failed within '{:?}'",
  35. ),
  36. pair_str, value,
  37. ));
  38. }
  39. let (client, curr) = (nums[0], nums[1]);
  40. if progress[client] != curr {
  41. return Err(format!(
  42. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  43. client, progress[client], curr, progress, value,
  44. ));
  45. }
  46. progress[client] = curr + 1;
  47. }
  48. assert_eq!(progress, expected, "Expecting progress in {}", value);
  49. Ok(())
  50. }
  51. #[test]
  52. fn basic_service() {
  53. init_test_log!();
  54. generic_test(GenericTestParams {
  55. clients: 1,
  56. ..Default::default()
  57. });
  58. }
  59. #[test]
  60. fn concurrent_client() {
  61. init_test_log!();
  62. generic_test(GenericTestParams {
  63. clients: 5,
  64. ..Default::default()
  65. });
  66. }
  67. #[test]
  68. fn unreliable_many_clients() {
  69. init_test_log!();
  70. generic_test(GenericTestParams {
  71. clients: 5,
  72. unreliable: true,
  73. ..Default::default()
  74. });
  75. }
  76. #[test]
  77. fn unreliable_one_key_many_clients() -> Result {
  78. init_test_log!();
  79. const SERVERS: usize = 5;
  80. let cfg = Arc::new(make_config(SERVERS, true, 0));
  81. defer!(cfg.clean_up());
  82. let mut clerk = cfg.make_clerk();
  83. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  84. clerk.put("k", "");
  85. const CLIENTS: usize = 5;
  86. const ATTEMPTS: usize = 10;
  87. let logger = LocalLogger::inherit();
  88. let client_results =
  89. spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
  90. logger.clone().attach();
  91. for i in 0..ATTEMPTS {
  92. clerk.append("k", format!("({}, {})", index, i));
  93. }
  94. });
  95. for client_result in client_results {
  96. client_result.join().expect("Client should never fail");
  97. }
  98. let value = clerk.get("k").expect("Key should exist");
  99. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  100. }
  101. #[test]
  102. fn one_partition() -> Result {
  103. init_test_log!();
  104. const SERVERS: usize = 5;
  105. let cfg = Arc::new(make_config(SERVERS, false, 0));
  106. defer!(cfg.clean_up());
  107. cfg.begin("Test: progress in majority (3A)");
  108. const KEY: &str = "1";
  109. let mut clerk = cfg.make_clerk();
  110. clerk.put(KEY, "13");
  111. let (majority, minority) = cfg.partition();
  112. assert!(minority.len() < majority.len());
  113. assert_eq!(minority.len() + majority.len(), SERVERS);
  114. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  115. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  116. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  117. clerk_majority.put(KEY, "14");
  118. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  119. cfg.begin("Test: no progress in minority (3A)");
  120. let counter = Arc::new(AtomicUsize::new(0));
  121. let counter1 = counter.clone();
  122. std::thread::spawn(move || {
  123. clerk_minority1.put(KEY, "15");
  124. counter1.fetch_or(1, Ordering::SeqCst);
  125. });
  126. let counter2 = counter.clone();
  127. std::thread::spawn(move || {
  128. clerk_minority2.get(KEY);
  129. counter2.fetch_or(2, Ordering::SeqCst);
  130. });
  131. sleep_millis(1000);
  132. assert_eq!(counter.load(Ordering::SeqCst), 0);
  133. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  134. clerk_majority.put(KEY, "16");
  135. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  136. cfg.begin("Test: completion after heal (3A)");
  137. cfg.connect_all();
  138. cfg.connect_all_clerks();
  139. sleep_election_timeouts(1);
  140. for _ in 0..100 {
  141. sleep_millis(60);
  142. if counter.load(Ordering::SeqCst) == 3 {
  143. break;
  144. }
  145. }
  146. assert_eq!(counter.load(Ordering::SeqCst), 3);
  147. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  148. Ok(())
  149. }
  150. #[test]
  151. fn many_partitions_one_client() {
  152. init_test_log!();
  153. generic_test(GenericTestParams {
  154. clients: 1,
  155. partition: true,
  156. ..Default::default()
  157. });
  158. }
  159. #[test]
  160. fn many_partitions_many_client() {
  161. init_test_log!();
  162. generic_test(GenericTestParams {
  163. clients: 5,
  164. partition: true,
  165. ..Default::default()
  166. });
  167. }
  168. #[test]
  169. fn persist_one_client() {
  170. init_test_log!();
  171. generic_test(GenericTestParams {
  172. clients: 1,
  173. crash: true,
  174. ..Default::default()
  175. });
  176. }
  177. #[test]
  178. fn persist_concurrent() {
  179. init_test_log!();
  180. generic_test(GenericTestParams {
  181. clients: 5,
  182. crash: true,
  183. ..Default::default()
  184. });
  185. }
  186. #[test]
  187. fn persist_concurrent_unreliable() {
  188. init_test_log!();
  189. generic_test(GenericTestParams {
  190. clients: 5,
  191. unreliable: true,
  192. crash: true,
  193. ..Default::default()
  194. });
  195. }
  196. #[test]
  197. fn persist_partition() {
  198. init_test_log!();
  199. generic_test(GenericTestParams {
  200. clients: 5,
  201. partition: true,
  202. crash: true,
  203. ..Default::default()
  204. });
  205. }
  206. #[test]
  207. fn persist_partition_unreliable() {
  208. init_test_log!();
  209. generic_test(GenericTestParams {
  210. clients: 5,
  211. unreliable: true,
  212. partition: true,
  213. crash: true,
  214. min_ops: Some(5),
  215. ..Default::default()
  216. });
  217. }
  218. #[test]
  219. fn linearizability() {
  220. init_test_log!();
  221. generic_test(GenericTestParams {
  222. clients: 15,
  223. unreliable: true,
  224. partition: true,
  225. crash: true,
  226. maxraftstate: None,
  227. min_ops: Some(0),
  228. test_linearizability: true,
  229. });
  230. }