service_test.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #![allow(clippy::uninlined_format_args)]
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::Arc;
  4. use scopeguard::defer;
  5. use test_configs::kvraft::config::make_config;
  6. use test_configs::kvraft::generic_test::{
  7. generic_test, spawn_clients, GenericTestParams,
  8. };
  9. use test_configs::utils::{sleep_election_timeouts, sleep_millis};
  10. use test_utils::init_test_log;
  11. use test_utils::thread_local_logger::LocalLogger;
  12. type Result = std::result::Result<(), String>;
  13. fn check_concurrent_results(
  14. value: String,
  15. clients: usize,
  16. expected: Vec<usize>,
  17. ) -> Result {
  18. if !value.starts_with('(') || !value.ends_with(')') {
  19. return Err(format!("Malformed value string {}", value));
  20. }
  21. let inner_value = &value[1..value.len() - 1];
  22. let mut progress = vec![0; clients];
  23. for pair_str in inner_value.split(")(") {
  24. let mut nums = vec![];
  25. for num_str in pair_str.split(", ") {
  26. let num: usize = num_str.parse().map_err(|_e| {
  27. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  28. })?;
  29. nums.push(num);
  30. }
  31. if nums.len() != 2 {
  32. return Err(format!(
  33. concat!(
  34. "More than two numbers in the same group when",
  35. " parsing '{:?}' failed within '{:?}'",
  36. ),
  37. pair_str, value,
  38. ));
  39. }
  40. let (client, curr) = (nums[0], nums[1]);
  41. if progress[client] != curr {
  42. return Err(format!(
  43. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  44. client, progress[client], curr, progress, value,
  45. ));
  46. }
  47. progress[client] = curr + 1;
  48. }
  49. assert_eq!(progress, expected, "Expecting progress in {}", value);
  50. Ok(())
  51. }
  52. #[test]
  53. fn basic_service() {
  54. init_test_log!();
  55. generic_test(GenericTestParams {
  56. clients: 1,
  57. ..Default::default()
  58. });
  59. }
  60. #[test]
  61. fn concurrent_client() {
  62. init_test_log!();
  63. generic_test(GenericTestParams {
  64. clients: 5,
  65. ..Default::default()
  66. });
  67. }
  68. #[test]
  69. fn unreliable_many_clients() {
  70. init_test_log!();
  71. generic_test(GenericTestParams {
  72. clients: 5,
  73. unreliable: true,
  74. ..Default::default()
  75. });
  76. }
  77. #[test]
  78. fn unreliable_one_key_many_clients() -> Result {
  79. init_test_log!();
  80. const SERVERS: usize = 5;
  81. let cfg = Arc::new(make_config(SERVERS, true, 0));
  82. defer!(cfg.clean_up());
  83. let mut clerk = cfg.make_clerk();
  84. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  85. clerk.put("k", "");
  86. const CLIENTS: usize = 5;
  87. const ATTEMPTS: usize = 10;
  88. let logger = LocalLogger::inherit();
  89. let client_results =
  90. spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
  91. logger.clone().attach();
  92. for i in 0..ATTEMPTS {
  93. clerk.append("k", format!("({}, {})", index, i));
  94. }
  95. });
  96. for client_result in client_results {
  97. client_result.join().expect("Client should never fail");
  98. }
  99. let value = clerk.get("k").expect("Key should exist");
  100. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  101. }
  102. #[test]
  103. fn one_partition() -> Result {
  104. init_test_log!();
  105. const SERVERS: usize = 5;
  106. let cfg = Arc::new(make_config(SERVERS, false, 0));
  107. defer!(cfg.clean_up());
  108. cfg.begin("Test: progress in majority (3A)");
  109. const KEY: &str = "1";
  110. let mut clerk = cfg.make_clerk();
  111. clerk.put(KEY, "13");
  112. let (majority, minority) = cfg.partition();
  113. assert!(minority.len() < majority.len());
  114. assert_eq!(minority.len() + majority.len(), SERVERS);
  115. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  116. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  117. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  118. clerk_majority.put(KEY, "14");
  119. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  120. cfg.begin("Test: no progress in minority (3A)");
  121. let counter = Arc::new(AtomicUsize::new(0));
  122. let counter1 = counter.clone();
  123. let logger = LocalLogger::inherit();
  124. std::thread::spawn(move || {
  125. logger.attach();
  126. clerk_minority1.put(KEY, "15");
  127. counter1.fetch_or(1, Ordering::AcqRel);
  128. });
  129. let counter2 = counter.clone();
  130. let logger = LocalLogger::inherit();
  131. std::thread::spawn(move || {
  132. logger.attach();
  133. clerk_minority2.get(KEY);
  134. counter2.fetch_or(2, Ordering::AcqRel);
  135. });
  136. sleep_millis(1000);
  137. assert_eq!(counter.load(Ordering::Acquire), 0);
  138. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  139. clerk_majority.put(KEY, "16");
  140. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  141. cfg.begin("Test: completion after heal (3A)");
  142. cfg.connect_all();
  143. cfg.connect_all_clerks();
  144. sleep_election_timeouts(1);
  145. for _ in 0..100 {
  146. sleep_millis(60);
  147. if counter.load(Ordering::Acquire) == 3 {
  148. break;
  149. }
  150. }
  151. assert_eq!(counter.load(Ordering::Acquire), 3);
  152. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  153. Ok(())
  154. }
  155. #[test]
  156. fn many_partitions_one_client() {
  157. init_test_log!();
  158. generic_test(GenericTestParams {
  159. clients: 1,
  160. partition: true,
  161. ..Default::default()
  162. });
  163. }
  164. #[test]
  165. fn many_partitions_many_client() {
  166. init_test_log!();
  167. generic_test(GenericTestParams {
  168. clients: 5,
  169. partition: true,
  170. ..Default::default()
  171. });
  172. }
  173. #[test]
  174. fn persist_one_client() {
  175. init_test_log!();
  176. generic_test(GenericTestParams {
  177. clients: 1,
  178. crash: true,
  179. ..Default::default()
  180. });
  181. }
  182. #[test]
  183. fn persist_concurrent() {
  184. init_test_log!();
  185. generic_test(GenericTestParams {
  186. clients: 5,
  187. crash: true,
  188. ..Default::default()
  189. });
  190. }
  191. #[test]
  192. fn persist_concurrent_unreliable() {
  193. init_test_log!();
  194. generic_test(GenericTestParams {
  195. clients: 5,
  196. unreliable: true,
  197. crash: true,
  198. ..Default::default()
  199. });
  200. }
  201. #[test]
  202. fn persist_partition() {
  203. init_test_log!();
  204. generic_test(GenericTestParams {
  205. clients: 5,
  206. partition: true,
  207. crash: true,
  208. ..Default::default()
  209. });
  210. }
  211. #[test]
  212. fn persist_partition_unreliable() {
  213. init_test_log!();
  214. generic_test(GenericTestParams {
  215. clients: 5,
  216. unreliable: true,
  217. partition: true,
  218. crash: true,
  219. min_ops: Some(5),
  220. ..Default::default()
  221. });
  222. }
  223. #[test]
  224. fn linearizability() {
  225. init_test_log!();
  226. generic_test(GenericTestParams {
  227. clients: 15,
  228. unreliable: true,
  229. partition: true,
  230. crash: true,
  231. maxraftstate: None,
  232. min_ops: Some(0),
  233. test_linearizability: true,
  234. });
  235. }