service_test.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. extern crate kvraft;
  2. #[macro_use]
  3. extern crate scopeguard;
  4. use std::sync::atomic::{AtomicUsize, Ordering};
  5. use std::sync::Arc;
  6. use kvraft::testing_utils::config::{
  7. make_config, sleep_election_timeouts, sleep_millis,
  8. };
  9. use kvraft::testing_utils::generic_test::{
  10. generic_test, spawn_clients, GenericTestParams,
  11. };
  12. type Result = std::result::Result<(), String>;
  13. fn check_concurrent_results(
  14. value: String,
  15. clients: usize,
  16. expected: Vec<usize>,
  17. ) -> Result {
  18. if !value.starts_with('(') || !value.ends_with(')') {
  19. return Err(format!("Malformed value string {}", value));
  20. }
  21. let inner_value = &value[1..value.len() - 1];
  22. let mut progress = vec![0; clients];
  23. for pair_str in inner_value.split(")(") {
  24. let mut nums = vec![];
  25. for num_str in pair_str.split(", ") {
  26. let num: usize = num_str.parse().map_err(|_e| {
  27. format!("Parsing '{:?}' failed within '{:?}'", num_str, value)
  28. })?;
  29. nums.push(num);
  30. }
  31. if nums.len() != 2 {
  32. return Err(format!(
  33. concat!(
  34. "More than two numbers in the same group when",
  35. " parsing '{:?}' failed within '{:?}'",
  36. ),
  37. pair_str, value,
  38. ));
  39. }
  40. let (client, curr) = (nums[0], nums[1]);
  41. if progress[client] != curr {
  42. return Err(format!(
  43. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  44. client, progress[client], curr, progress, value,
  45. ));
  46. }
  47. progress[client] = curr + 1;
  48. }
  49. assert_eq!(progress, expected, "Expecting progress in {}", value);
  50. Ok(())
  51. }
  52. #[test]
  53. fn basic_service() {
  54. generic_test(GenericTestParams {
  55. clients: 1,
  56. ..Default::default()
  57. });
  58. }
  59. #[test]
  60. fn concurrent_client() {
  61. generic_test(GenericTestParams {
  62. clients: 5,
  63. ..Default::default()
  64. });
  65. }
  66. #[test]
  67. fn unreliable_many_clients() {
  68. generic_test(GenericTestParams {
  69. clients: 5,
  70. unreliable: true,
  71. ..Default::default()
  72. });
  73. }
  74. #[test]
  75. fn unreliable_one_key_many_clients() -> Result {
  76. const SERVERS: usize = 5;
  77. let cfg = Arc::new(make_config(SERVERS, true, 0));
  78. defer!(cfg.clean_up());
  79. let mut clerk = cfg.make_clerk();
  80. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  81. clerk.put("k", "");
  82. const CLIENTS: usize = 5;
  83. const ATTEMPTS: usize = 10;
  84. let client_results =
  85. spawn_clients(cfg.clone(), CLIENTS, |index, mut clerk| {
  86. for i in 0..ATTEMPTS {
  87. clerk.append("k", format!("({}, {})", index, i));
  88. }
  89. });
  90. for client_result in client_results {
  91. client_result.join().expect("Client should never fail");
  92. }
  93. let value = clerk.get("k").expect("Key should exist");
  94. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  95. }
  96. #[test]
  97. fn one_partition() -> Result {
  98. const SERVERS: usize = 5;
  99. let cfg = Arc::new(make_config(SERVERS, false, 0));
  100. defer!(cfg.clean_up());
  101. cfg.begin("Test: progress in majority (3A)");
  102. const KEY: &str = "1";
  103. let mut clerk = cfg.make_clerk();
  104. clerk.put(KEY, "13");
  105. let (majority, minority) = cfg.partition();
  106. assert!(minority.len() < majority.len());
  107. assert_eq!(minority.len() + majority.len(), SERVERS);
  108. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  109. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  110. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  111. clerk_majority.put(KEY, "14");
  112. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  113. cfg.begin("Test: no progress in minority (3A)");
  114. let counter = Arc::new(AtomicUsize::new(0));
  115. let counter1 = counter.clone();
  116. std::thread::spawn(move || {
  117. clerk_minority1.put(KEY, "15");
  118. counter1.fetch_or(1, Ordering::SeqCst);
  119. });
  120. let counter2 = counter.clone();
  121. std::thread::spawn(move || {
  122. clerk_minority2.get(KEY);
  123. counter2.fetch_or(2, Ordering::SeqCst);
  124. });
  125. sleep_millis(1000);
  126. assert_eq!(counter.load(Ordering::SeqCst), 0);
  127. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  128. clerk_majority.put(KEY, "16");
  129. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  130. cfg.begin("Test: completion after heal (3A)");
  131. cfg.connect_all();
  132. cfg.connect_all_clerks();
  133. sleep_election_timeouts(1);
  134. for _ in 0..100 {
  135. sleep_millis(60);
  136. if counter.load(Ordering::SeqCst) == 3 {
  137. break;
  138. }
  139. }
  140. assert_eq!(counter.load(Ordering::SeqCst), 3);
  141. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  142. Ok(())
  143. }
  144. #[test]
  145. fn many_partitions_one_client() {
  146. generic_test(GenericTestParams {
  147. clients: 1,
  148. partition: true,
  149. ..Default::default()
  150. });
  151. }
  152. #[test]
  153. fn many_partitions_many_client() {
  154. generic_test(GenericTestParams {
  155. clients: 5,
  156. partition: true,
  157. ..Default::default()
  158. });
  159. }
  160. #[test]
  161. fn persist_one_client() {
  162. generic_test(GenericTestParams {
  163. clients: 1,
  164. crash: true,
  165. ..Default::default()
  166. });
  167. }
  168. #[test]
  169. fn persist_concurrent() {
  170. generic_test(GenericTestParams {
  171. clients: 5,
  172. crash: true,
  173. ..Default::default()
  174. });
  175. }
  176. #[test]
  177. fn persist_concurrent_unreliable() {
  178. generic_test(GenericTestParams {
  179. clients: 5,
  180. unreliable: true,
  181. crash: true,
  182. ..Default::default()
  183. });
  184. }
  185. #[test]
  186. fn persist_partition() {
  187. generic_test(GenericTestParams {
  188. clients: 5,
  189. partition: true,
  190. crash: true,
  191. ..Default::default()
  192. });
  193. }
  194. #[test]
  195. fn persist_partition_unreliable() {
  196. generic_test(GenericTestParams {
  197. clients: 5,
  198. unreliable: true,
  199. partition: true,
  200. crash: true,
  201. min_ops: Some(5),
  202. ..Default::default()
  203. });
  204. }
  205. #[test]
  206. fn linearizability() {
  207. generic_test(GenericTestParams {
  208. clients: 15,
  209. unreliable: true,
  210. partition: true,
  211. crash: true,
  212. maxraftstate: None,
  213. min_ops: Some(0),
  214. test_linearizability: true,
  215. });
  216. }