service_test.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use scopeguard::defer;
  4. use test_configs::kvraft::config::{
  5. make_config, sleep_election_timeouts, sleep_millis,
  6. };
  7. use test_configs::kvraft::generic_test::{
  8. generic_test, spawn_clients, GenericTestParams,
  9. };
  10. use test_utils::init_test_log;
  11. use test_utils::thread_local_logger::LocalLogger;
  12. type Result = std::result::Result<(), String>;
  13. fn check_concurrent_results(
  14. value: String,
  15. clients: usize,
  16. expected: Vec<usize>,
  17. ) -> Result {
  18. if !value.starts_with('(') || !value.ends_with(')') {
  19. return Err(format!("Malformed value string {}", value));
  20. }
  21. let inner_value = &value[1..value.len() - 1];
  22. let mut progress = vec![0; clients];
  23. for pair_str in inner_value.split(")(") {
  24. let mut nums = vec![];
  25. for num_str in pair_str.split(", ") {
  26. let num: usize = num_str.parse().map_err(|_e| {
  27. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  28. })?;
  29. nums.push(num);
  30. }
  31. if nums.len() != 2 {
  32. return Err(format!(
  33. concat!(
  34. "More than two numbers in the same group when",
  35. " parsing '{:?}' failed within '{:?}'",
  36. ),
  37. pair_str, value,
  38. ));
  39. }
  40. let (client, curr) = (nums[0], nums[1]);
  41. if progress[client] != curr {
  42. return Err(format!(
  43. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  44. client, progress[client], curr, progress, value,
  45. ));
  46. }
  47. progress[client] = curr + 1;
  48. }
  49. assert_eq!(progress, expected, "Expecting progress in {}", value);
  50. Ok(())
  51. }
  52. #[test]
  53. fn basic_service() {
  54. init_test_log!();
  55. generic_test(GenericTestParams {
  56. clients: 1,
  57. ..Default::default()
  58. });
  59. }
  60. #[test]
  61. fn concurrent_client() {
  62. init_test_log!();
  63. generic_test(GenericTestParams {
  64. clients: 5,
  65. ..Default::default()
  66. });
  67. }
  68. #[test]
  69. fn unreliable_many_clients() {
  70. init_test_log!();
  71. generic_test(GenericTestParams {
  72. clients: 5,
  73. unreliable: true,
  74. ..Default::default()
  75. });
  76. }
  77. #[test]
  78. fn unreliable_one_key_many_clients() -> Result {
  79. init_test_log!();
  80. const SERVERS: usize = 5;
  81. let cfg = Arc::new(make_config(SERVERS, true, 0));
  82. defer!(cfg.clean_up());
  83. let mut clerk = cfg.make_clerk();
  84. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  85. clerk.put("k", "");
  86. const CLIENTS: usize = 5;
  87. const ATTEMPTS: usize = 10;
  88. let logger = LocalLogger::inherit();
  89. let client_results =
  90. spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
  91. logger.clone().attach();
  92. for i in 0..ATTEMPTS {
  93. clerk.append("k", format!("({}, {})", index, i));
  94. }
  95. });
  96. for client_result in client_results {
  97. client_result.join().expect("Client should never fail");
  98. }
  99. let value = clerk.get("k").expect("Key should exist");
  100. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  101. }
  102. #[test]
  103. fn one_partition() -> Result {
  104. init_test_log!();
  105. const SERVERS: usize = 5;
  106. let cfg = Arc::new(make_config(SERVERS, false, 0));
  107. defer!(cfg.clean_up());
  108. cfg.begin("Test: progress in majority (3A)");
  109. const KEY: &str = "1";
  110. let mut clerk = cfg.make_clerk();
  111. clerk.put(KEY, "13");
  112. let (majority, minority) = cfg.partition();
  113. assert!(minority.len() < majority.len());
  114. assert_eq!(minority.len() + majority.len(), SERVERS);
  115. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  116. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  117. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  118. clerk_majority.put(KEY, "14");
  119. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  120. cfg.begin("Test: no progress in minority (3A)");
  121. let counter = Arc::new(AtomicUsize::new(0));
  122. let counter1 = counter.clone();
  123. std::thread::spawn(move || {
  124. clerk_minority1.put(KEY, "15");
  125. counter1.fetch_or(1, Ordering::SeqCst);
  126. });
  127. let counter2 = counter.clone();
  128. std::thread::spawn(move || {
  129. clerk_minority2.get(KEY);
  130. counter2.fetch_or(2, Ordering::SeqCst);
  131. });
  132. sleep_millis(1000);
  133. assert_eq!(counter.load(Ordering::SeqCst), 0);
  134. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  135. clerk_majority.put(KEY, "16");
  136. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  137. cfg.begin("Test: completion after heal (3A)");
  138. cfg.connect_all();
  139. cfg.connect_all_clerks();
  140. sleep_election_timeouts(1);
  141. for _ in 0..100 {
  142. sleep_millis(60);
  143. if counter.load(Ordering::SeqCst) == 3 {
  144. break;
  145. }
  146. }
  147. assert_eq!(counter.load(Ordering::SeqCst), 3);
  148. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  149. Ok(())
  150. }
  151. #[test]
  152. fn many_partitions_one_client() {
  153. init_test_log!();
  154. generic_test(GenericTestParams {
  155. clients: 1,
  156. partition: true,
  157. ..Default::default()
  158. });
  159. }
  160. #[test]
  161. fn many_partitions_many_client() {
  162. init_test_log!();
  163. generic_test(GenericTestParams {
  164. clients: 5,
  165. partition: true,
  166. ..Default::default()
  167. });
  168. }
  169. #[test]
  170. fn persist_one_client() {
  171. init_test_log!();
  172. generic_test(GenericTestParams {
  173. clients: 1,
  174. crash: true,
  175. ..Default::default()
  176. });
  177. }
  178. #[test]
  179. fn persist_concurrent() {
  180. init_test_log!();
  181. generic_test(GenericTestParams {
  182. clients: 5,
  183. crash: true,
  184. ..Default::default()
  185. });
  186. }
  187. #[test]
  188. fn persist_concurrent_unreliable() {
  189. init_test_log!();
  190. generic_test(GenericTestParams {
  191. clients: 5,
  192. unreliable: true,
  193. crash: true,
  194. ..Default::default()
  195. });
  196. }
  197. #[test]
  198. fn persist_partition() {
  199. init_test_log!();
  200. generic_test(GenericTestParams {
  201. clients: 5,
  202. partition: true,
  203. crash: true,
  204. ..Default::default()
  205. });
  206. }
  207. #[test]
  208. fn persist_partition_unreliable() {
  209. init_test_log!();
  210. generic_test(GenericTestParams {
  211. clients: 5,
  212. unreliable: true,
  213. partition: true,
  214. crash: true,
  215. min_ops: Some(5),
  216. ..Default::default()
  217. });
  218. }
  219. #[test]
  220. fn linearizability() {
  221. init_test_log!();
  222. generic_test(GenericTestParams {
  223. clients: 15,
  224. unreliable: true,
  225. partition: true,
  226. crash: true,
  227. maxraftstate: None,
  228. min_ops: Some(0),
  229. test_linearizability: true,
  230. });
  231. }