service_test.rs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #[macro_use]
  2. extern crate anyhow;
  3. extern crate kvraft;
  4. #[macro_use]
  5. extern crate scopeguard;
  6. use std::sync::atomic::{AtomicUsize, Ordering};
  7. use std::sync::Arc;
  8. use anyhow::Context;
  9. use kvraft::testing_utils::config::{
  10. make_config, sleep_election_timeouts, sleep_millis,
  11. };
  12. use kvraft::testing_utils::generic_test::{
  13. generic_test, spawn_clients, GenericTestParams,
  14. };
  15. fn check_concurrent_results(
  16. value: String,
  17. clients: usize,
  18. expected: Vec<usize>,
  19. ) -> anyhow::Result<()> {
  20. if !value.starts_with('(') || !value.ends_with(')') {
  21. bail!("Malformed value string {}", value)
  22. }
  23. let inner_value = &value[1..value.len() - 1];
  24. let mut progress = vec![0; clients];
  25. for pair_str in inner_value.split(")(") {
  26. let mut nums = vec![];
  27. for num_str in pair_str.split(", ") {
  28. let num: usize = num_str.parse().context(format!(
  29. "Parsing '{:?}' failed within '{:?}'",
  30. num_str, value,
  31. ))?;
  32. nums.push(num);
  33. }
  34. if nums.len() != 2 {
  35. bail!(
  36. concat!(
  37. "More than two numbers in the same group when",
  38. " parsing '{:?}' failed within '{:?}'",
  39. ),
  40. pair_str,
  41. value,
  42. );
  43. }
  44. let (client, curr) = (nums[0], nums[1]);
  45. if progress[client] != curr {
  46. bail!(
  47. "Client {} failed, expecting {}, got {}, others are {:?} in {}",
  48. client,
  49. progress[client],
  50. curr,
  51. progress,
  52. value,
  53. )
  54. }
  55. progress[client] = curr + 1;
  56. }
  57. assert_eq!(progress, expected, "Expecting progress in {}", value);
  58. Ok(())
  59. }
  60. #[test]
  61. fn basic_service() {
  62. generic_test(GenericTestParams {
  63. clients: 1,
  64. ..Default::default()
  65. });
  66. }
  67. #[test]
  68. fn concurrent_client() {
  69. generic_test(GenericTestParams {
  70. clients: 5,
  71. ..Default::default()
  72. });
  73. }
  74. #[test]
  75. fn unreliable_many_clients() {
  76. generic_test(GenericTestParams {
  77. clients: 5,
  78. unreliable: true,
  79. ..Default::default()
  80. });
  81. }
  82. #[test]
  83. fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
  84. const SERVERS: usize = 5;
  85. let cfg = Arc::new(make_config(SERVERS, true, 0));
  86. defer!(cfg.clean_up());
  87. let mut clerk = cfg.make_clerk();
  88. cfg.begin("Test: concurrent append to same key, unreliable (3A)");
  89. clerk.put("k", "");
  90. const CLIENTS: usize = 5;
  91. const ATTEMPTS: usize = 10;
  92. let client_results =
  93. spawn_clients(cfg.clone(), CLIENTS, |index, mut clerk| {
  94. for i in 0..ATTEMPTS {
  95. clerk.append("k", format!("({}, {})", index, i));
  96. }
  97. });
  98. for client_result in client_results {
  99. client_result.join().expect("Client should never fail");
  100. }
  101. let value = clerk.get("k").expect("Key should exist");
  102. check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
  103. }
  104. #[test]
  105. fn one_partition() -> anyhow::Result<()> {
  106. const SERVERS: usize = 5;
  107. let cfg = Arc::new(make_config(SERVERS, false, 0));
  108. defer!(cfg.clean_up());
  109. cfg.begin("Test: progress in majority (3A)");
  110. const KEY: &str = "1";
  111. let mut clerk = cfg.make_clerk();
  112. clerk.put(KEY, "13");
  113. let (majority, minority) = cfg.partition();
  114. assert!(minority.len() < majority.len());
  115. assert_eq!(minority.len() + majority.len(), SERVERS);
  116. let mut clerk_majority = cfg.make_limited_clerk(&majority);
  117. let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
  118. let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
  119. clerk_majority.put(KEY, "14");
  120. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  121. cfg.begin("Test: no progress in minority (3A)");
  122. let counter = Arc::new(AtomicUsize::new(0));
  123. let counter1 = counter.clone();
  124. std::thread::spawn(move || {
  125. clerk_minority1.put(KEY, "15");
  126. counter1.fetch_or(1, Ordering::SeqCst);
  127. });
  128. let counter2 = counter.clone();
  129. std::thread::spawn(move || {
  130. clerk_minority2.get(KEY);
  131. counter2.fetch_or(2, Ordering::SeqCst);
  132. });
  133. sleep_millis(1000);
  134. assert_eq!(counter.load(Ordering::SeqCst), 0);
  135. assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
  136. clerk_majority.put(KEY, "16");
  137. assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
  138. cfg.begin("Test: completion after heal (3A)");
  139. cfg.connect_all();
  140. cfg.connect_all_clerks();
  141. sleep_election_timeouts(1);
  142. for _ in 0..100 {
  143. sleep_millis(60);
  144. if counter.load(Ordering::SeqCst) == 3 {
  145. break;
  146. }
  147. }
  148. assert_eq!(counter.load(Ordering::SeqCst), 3);
  149. assert_eq!(clerk.get(KEY), Some("15".to_owned()));
  150. Ok(())
  151. }
  152. #[test]
  153. fn many_partitions_one_client() {
  154. generic_test(GenericTestParams {
  155. clients: 1,
  156. partition: true,
  157. ..Default::default()
  158. });
  159. }
  160. #[test]
  161. fn many_partitions_many_client() {
  162. generic_test(GenericTestParams {
  163. clients: 5,
  164. partition: true,
  165. ..Default::default()
  166. });
  167. }
  168. #[test]
  169. fn persist_one_client() {
  170. generic_test(GenericTestParams {
  171. clients: 1,
  172. crash: true,
  173. ..Default::default()
  174. });
  175. }
  176. #[test]
  177. fn persist_concurrent() {
  178. generic_test(GenericTestParams {
  179. clients: 5,
  180. crash: true,
  181. ..Default::default()
  182. });
  183. }
  184. #[test]
  185. fn persist_concurrent_unreliable() {
  186. generic_test(GenericTestParams {
  187. clients: 5,
  188. unreliable: true,
  189. crash: true,
  190. ..Default::default()
  191. });
  192. }
  193. #[test]
  194. fn persist_partition() {
  195. generic_test(GenericTestParams {
  196. clients: 5,
  197. partition: true,
  198. crash: true,
  199. ..Default::default()
  200. });
  201. }
  202. #[test]
  203. fn persist_partition_unreliable() {
  204. generic_test(GenericTestParams {
  205. clients: 5,
  206. unreliable: true,
  207. partition: true,
  208. crash: true,
  209. min_ops: Some(5),
  210. ..Default::default()
  211. });
  212. }
  213. #[test]
  214. fn linearizability() {
  215. generic_test(GenericTestParams {
  216. clients: 15,
  217. unreliable: true,
  218. partition: true,
  219. crash: true,
  220. maxraftstate: None,
  221. min_ops: Some(0),
  222. test_linearizability: true,
  223. });
  224. }