service_test.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. extern crate kvraft;
  2. #[macro_use]
  3. extern crate scopeguard;
  4. use std::sync::atomic::{AtomicUsize, Ordering};
  5. use std::sync::Arc;
  6. use kvraft::testing_utils::config::{
  7. make_config, sleep_election_timeouts, sleep_millis,
  8. };
  9. use kvraft::testing_utils::generic_test::{
  10. generic_test, spawn_clients, GenericTestParams,
  11. };
  12. use test_utils::init_test_log;
  13. use test_utils::thread_local_logger::LocalLogger;
  14. type Result = std::result::Result<(), String>;
  15. fn check_concurrent_results(
  16. value: String,
  17. clients: usize,
  18. expected: Vec<usize>,
  19. ) -> Result {
  20. if !value.starts_with('(') || !value.ends_with(')') {
  21. return Err(format!("Malformed value string {}", value));
  22. }
  23. let inner_value = &value[1..value.len() - 1];
  24. let mut progress = vec![0; clients];
  25. for pair_str in inner_value.split(")(") {
  26. let mut nums = vec![];
  27. for num_str in pair_str.split(", ") {
  28. let num: usize = num_str.parse().map_err(|_e| {
  29. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  30. })?;
  31. nums.push(num);
  32. }
  33. if nums.len() != 2 {
  34. return Err(format!(
  35. concat!(
  36. "More than two numbers in the same group when",
  37. " parsing '{:?}' failed within '{:?}'",
  38. ),
  39. pair_str, value,
  40. ));
  41. }
  42. let (client, curr) = (nums[0], nums[1]);
  43. if progress[client] != curr {
  44. return Err(format!(
  45. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  46. client, progress[client], curr, progress, value,
  47. ));
  48. }
  49. progress[client] = curr + 1;
  50. }
  51. assert_eq!(progress, expected, "Expecting progress in {}", value);
  52. Ok(())
  53. }
  54. #[test]
  55. fn basic_service() {
  56. init_test_log!();
  57. generic_test(GenericTestParams {
  58. clients: 1,
  59. ..Default::default()
  60. });
  61. }
  62. #[test]
  63. fn concurrent_client() {
  64. init_test_log!();
  65. generic_test(GenericTestParams {
  66. clients: 5,
  67. ..Default::default()
  68. });
  69. }
  70. #[test]
  71. fn unreliable_many_clients() {
  72. init_test_log!();
  73. generic_test(GenericTestParams {
  74. clients: 5,
  75. unreliable: true,
  76. ..Default::default()
  77. });
  78. }
  79. #[test]
  80. fn unreliable_one_key_many_clients() -> Result {
  81. init_test_log!();
  82. const SERVERS: usize = 5;
  83. let cfg = Arc::new(make_config(SERVERS, true, 0));
  84. defer!(cfg.clean_up());
  85. let mut clerk = cfg.make_clerk();
  86. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  87. clerk.put("k", "");
  88. const CLIENTS: usize = 5;
  89. const ATTEMPTS: usize = 10;
  90. let logger = LocalLogger::inherit();
  91. let client_results =
  92. spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
  93. logger.clone().attach();
  94. for i in 0..ATTEMPTS {
  95. clerk.append("k", format!("({}, {})", index, i));
  96. }
  97. });
  98. for client_result in client_results {
  99. client_result.join().expect("Client should never fail");
  100. }
  101. let value = clerk.get("k").expect("Key should exist");
  102. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  103. }
  104. #[test]
  105. fn one_partition() -> Result {
  106. init_test_log!();
  107. const SERVERS: usize = 5;
  108. let cfg = Arc::new(make_config(SERVERS, false, 0));
  109. defer!(cfg.clean_up());
  110. cfg.begin("Test: progress in majority (3A)");
  111. const KEY: &str = "1";
  112. let mut clerk = cfg.make_clerk();
  113. clerk.put(KEY, "13");
  114. let (majority, minority) = cfg.partition();
  115. assert!(minority.len() < majority.len());
  116. assert_eq!(minority.len() + majority.len(), SERVERS);
  117. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  118. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  119. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  120. clerk_majority.put(KEY, "14");
  121. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  122. cfg.begin("Test: no progress in minority (3A)");
  123. let counter = Arc::new(AtomicUsize::new(0));
  124. let counter1 = counter.clone();
  125. std::thread::spawn(move || {
  126. clerk_minority1.put(KEY, "15");
  127. counter1.fetch_or(1, Ordering::SeqCst);
  128. });
  129. let counter2 = counter.clone();
  130. std::thread::spawn(move || {
  131. clerk_minority2.get(KEY);
  132. counter2.fetch_or(2, Ordering::SeqCst);
  133. });
  134. sleep_millis(1000);
  135. assert_eq!(counter.load(Ordering::SeqCst), 0);
  136. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  137. clerk_majority.put(KEY, "16");
  138. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  139. cfg.begin("Test: completion after heal (3A)");
  140. cfg.connect_all();
  141. cfg.connect_all_clerks();
  142. sleep_election_timeouts(1);
  143. for _ in 0..100 {
  144. sleep_millis(60);
  145. if counter.load(Ordering::SeqCst) == 3 {
  146. break;
  147. }
  148. }
  149. assert_eq!(counter.load(Ordering::SeqCst), 3);
  150. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  151. Ok(())
  152. }
  153. #[test]
  154. fn many_partitions_one_client() {
  155. init_test_log!();
  156. generic_test(GenericTestParams {
  157. clients: 1,
  158. partition: true,
  159. ..Default::default()
  160. });
  161. }
  162. #[test]
  163. fn many_partitions_many_client() {
  164. init_test_log!();
  165. generic_test(GenericTestParams {
  166. clients: 5,
  167. partition: true,
  168. ..Default::default()
  169. });
  170. }
  171. #[test]
  172. fn persist_one_client() {
  173. init_test_log!();
  174. generic_test(GenericTestParams {
  175. clients: 1,
  176. crash: true,
  177. ..Default::default()
  178. });
  179. }
  180. #[test]
  181. fn persist_concurrent() {
  182. init_test_log!();
  183. generic_test(GenericTestParams {
  184. clients: 5,
  185. crash: true,
  186. ..Default::default()
  187. });
  188. }
  189. #[test]
  190. fn persist_concurrent_unreliable() {
  191. init_test_log!();
  192. generic_test(GenericTestParams {
  193. clients: 5,
  194. unreliable: true,
  195. crash: true,
  196. ..Default::default()
  197. });
  198. }
  199. #[test]
  200. fn persist_partition() {
  201. init_test_log!();
  202. generic_test(GenericTestParams {
  203. clients: 5,
  204. partition: true,
  205. crash: true,
  206. ..Default::default()
  207. });
  208. }
  209. #[test]
  210. fn persist_partition_unreliable() {
  211. init_test_log!();
  212. generic_test(GenericTestParams {
  213. clients: 5,
  214. unreliable: true,
  215. partition: true,
  216. crash: true,
  217. min_ops: Some(5),
  218. ..Default::default()
  219. });
  220. }
  221. #[test]
  222. fn linearizability() {
  223. init_test_log!();
  224. generic_test(GenericTestParams {
  225. clients: 15,
  226. unreliable: true,
  227. partition: true,
  228. crash: true,
  229. maxraftstate: None,
  230. min_ops: Some(0),
  231. test_linearizability: true,
  232. });
  233. }