service_test.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use scopeguard::defer;
  4. use test_configs::kvraft::config::make_config;
  5. use test_configs::kvraft::generic_test::{
  6. generic_test, spawn_clients, GenericTestParams,
  7. };
  8. use test_configs::utils::{sleep_election_timeouts, sleep_millis};
  9. use test_utils::init_test_log;
  10. use test_utils::thread_local_logger::LocalLogger;
  11. type Result = std::result::Result<(), String>;
  12. fn check_concurrent_results(
  13. value: String,
  14. clients: usize,
  15. expected: Vec<usize>,
  16. ) -> Result {
  17. if !value.starts_with('(') || !value.ends_with(')') {
  18. return Err(format!("Malformed value string {}", value));
  19. }
  20. let inner_value = &value[1..value.len() - 1];
  21. let mut progress = vec![0; clients];
  22. for pair_str in inner_value.split(")(") {
  23. let mut nums = vec![];
  24. for num_str in pair_str.split(", ") {
  25. let num: usize = num_str.parse().map_err(|_e| {
  26. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  27. })?;
  28. nums.push(num);
  29. }
  30. if nums.len() != 2 {
  31. return Err(format!(
  32. concat!(
  33. "More than two numbers in the same group when",
  34. " parsing '{:?}' failed within '{:?}'",
  35. ),
  36. pair_str, value,
  37. ));
  38. }
  39. let (client, curr) = (nums[0], nums[1]);
  40. if progress[client] != curr {
  41. return Err(format!(
  42. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  43. client, progress[client], curr, progress, value,
  44. ));
  45. }
  46. progress[client] = curr + 1;
  47. }
  48. assert_eq!(progress, expected, "Expecting progress in {}", value);
  49. Ok(())
  50. }
  51. #[test]
  52. fn basic_service() {
  53. init_test_log!();
  54. generic_test(GenericTestParams {
  55. clients: 1,
  56. ..Default::default()
  57. });
  58. }
  59. #[test]
  60. fn concurrent_client() {
  61. init_test_log!();
  62. generic_test(GenericTestParams {
  63. clients: 5,
  64. ..Default::default()
  65. });
  66. }
  67. #[test]
  68. fn unreliable_many_clients() {
  69. init_test_log!();
  70. generic_test(GenericTestParams {
  71. clients: 5,
  72. unreliable: true,
  73. ..Default::default()
  74. });
  75. }
  76. #[test]
  77. fn unreliable_one_key_many_clients() -> Result {
  78. init_test_log!();
  79. const SERVERS: usize = 5;
  80. let cfg = Arc::new(make_config(SERVERS, true, 0));
  81. defer!(cfg.clean_up());
  82. let mut clerk = cfg.make_clerk();
  83. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  84. clerk.put("k", "");
  85. const CLIENTS: usize = 5;
  86. const ATTEMPTS: usize = 10;
  87. let logger = LocalLogger::inherit();
  88. let client_results =
  89. spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
  90. logger.clone().attach();
  91. for i in 0..ATTEMPTS {
  92. clerk.append("k", format!("({}, {})", index, i));
  93. }
  94. });
  95. for client_result in client_results {
  96. client_result.join().expect("Client should never fail");
  97. }
  98. let value = clerk.get("k").expect("Key should exist");
  99. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  100. }
  101. #[test]
  102. fn one_partition() -> Result {
  103. init_test_log!();
  104. const SERVERS: usize = 5;
  105. let cfg = Arc::new(make_config(SERVERS, false, 0));
  106. defer!(cfg.clean_up());
  107. cfg.begin("Test: progress in majority (3A)");
  108. const KEY: &str = "1";
  109. let mut clerk = cfg.make_clerk();
  110. clerk.put(KEY, "13");
  111. let (majority, minority) = cfg.partition();
  112. assert!(minority.len() < majority.len());
  113. assert_eq!(minority.len() + majority.len(), SERVERS);
  114. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  115. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  116. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  117. clerk_majority.put(KEY, "14");
  118. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  119. cfg.begin("Test: no progress in minority (3A)");
  120. let counter = Arc::new(AtomicUsize::new(0));
  121. let counter1 = counter.clone();
  122. let logger = LocalLogger::inherit();
  123. std::thread::spawn(move || {
  124. logger.attach();
  125. clerk_minority1.put(KEY, "15");
  126. counter1.fetch_or(1, Ordering::AcqRel);
  127. });
  128. let counter2 = counter.clone();
  129. let logger = LocalLogger::inherit();
  130. std::thread::spawn(move || {
  131. logger.attach();
  132. clerk_minority2.get(KEY);
  133. counter2.fetch_or(2, Ordering::AcqRel);
  134. });
  135. sleep_millis(1000);
  136. assert_eq!(counter.load(Ordering::Acquire), 0);
  137. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  138. clerk_majority.put(KEY, "16");
  139. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  140. cfg.begin("Test: completion after heal (3A)");
  141. cfg.connect_all();
  142. cfg.connect_all_clerks();
  143. sleep_election_timeouts(1);
  144. for _ in 0..100 {
  145. sleep_millis(60);
  146. if counter.load(Ordering::Acquire) == 3 {
  147. break;
  148. }
  149. }
  150. assert_eq!(counter.load(Ordering::Acquire), 3);
  151. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  152. Ok(())
  153. }
  154. #[test]
  155. fn many_partitions_one_client() {
  156. init_test_log!();
  157. generic_test(GenericTestParams {
  158. clients: 1,
  159. partition: true,
  160. ..Default::default()
  161. });
  162. }
  163. #[test]
  164. fn many_partitions_many_client() {
  165. init_test_log!();
  166. generic_test(GenericTestParams {
  167. clients: 5,
  168. partition: true,
  169. ..Default::default()
  170. });
  171. }
  172. #[test]
  173. fn persist_one_client() {
  174. init_test_log!();
  175. generic_test(GenericTestParams {
  176. clients: 1,
  177. crash: true,
  178. ..Default::default()
  179. });
  180. }
  181. #[test]
  182. fn persist_concurrent() {
  183. init_test_log!();
  184. generic_test(GenericTestParams {
  185. clients: 5,
  186. crash: true,
  187. ..Default::default()
  188. });
  189. }
  190. #[test]
  191. fn persist_concurrent_unreliable() {
  192. init_test_log!();
  193. generic_test(GenericTestParams {
  194. clients: 5,
  195. unreliable: true,
  196. crash: true,
  197. ..Default::default()
  198. });
  199. }
  200. #[test]
  201. fn persist_partition() {
  202. init_test_log!();
  203. generic_test(GenericTestParams {
  204. clients: 5,
  205. partition: true,
  206. crash: true,
  207. ..Default::default()
  208. });
  209. }
  210. #[test]
  211. fn persist_partition_unreliable() {
  212. init_test_log!();
  213. generic_test(GenericTestParams {
  214. clients: 5,
  215. unreliable: true,
  216. partition: true,
  217. crash: true,
  218. min_ops: Some(5),
  219. ..Default::default()
  220. });
  221. }
  222. #[test]
  223. fn linearizability() {
  224. init_test_log!();
  225. generic_test(GenericTestParams {
  226. clients: 15,
  227. unreliable: true,
  228. partition: true,
  229. crash: true,
  230. maxraftstate: None,
  231. min_ops: Some(0),
  232. test_linearizability: true,
  233. });
  234. }