service_test.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. extern crate kvraft;
  2. extern crate rand;
  3. #[macro_use]
  4. extern crate scopeguard;
  5. use std::sync::atomic::{AtomicBool, Ordering};
  6. use std::sync::Arc;
  7. use std::thread::JoinHandle;
  8. use std::time::Duration;
  9. use rand::{thread_rng, Rng};
  10. use kvraft::testing_utils::config::{make_config, Config};
  11. use kvraft::Clerk;
  12. fn spawn_clients<T, Func>(
  13. config: Arc<Config>,
  14. clients: usize,
  15. func: Func,
  16. ) -> Vec<JoinHandle<T>>
  17. where
  18. T: 'static + Send,
  19. Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
  20. {
  21. let mut client_threads = vec![];
  22. for i in 0..clients {
  23. let clerk = config.make_clerk();
  24. let func = func.clone();
  25. client_threads.push(std::thread::spawn(move || func(i, clerk)))
  26. }
  27. eprintln!("spawning clients done.");
  28. client_threads
  29. }
  30. fn appending_client(
  31. index: usize,
  32. mut clerk: Clerk,
  33. stop: Arc<AtomicBool>,
  34. ) -> (usize, String) {
  35. eprintln!("client {} running.", index);
  36. let mut op_count = 0usize;
  37. let key = index.to_string();
  38. let mut last = String::new();
  39. let mut rng = thread_rng();
  40. clerk.put(key.clone(), last.clone());
  41. while !stop.load(Ordering::Acquire) {
  42. eprintln!("client {} starting {}.", index, op_count);
  43. if rng.gen_ratio(1, 2) {
  44. let value = format!("({}, {}), ", index, op_count);
  45. last.push_str(&value);
  46. clerk.append(key.clone(), value);
  47. op_count += 1;
  48. } else {
  49. let value = clerk
  50. .get(key.clone())
  51. .expect(&format!("Key {} should exist.", index));
  52. assert_eq!(value, last);
  53. }
  54. eprintln!("client {} done {}.", index, op_count);
  55. }
  56. eprintln!("client {} done.", index);
  57. (op_count, last)
  58. }
  59. fn generic_test(clients: usize, unreliable: bool, maxraftstate: usize) {
  60. const SERVERS: usize = 5;
  61. let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
  62. // TODO(ditsing): add `defer!(cfg.clean_up());`
  63. cfg.begin("");
  64. let mut clerk = cfg.make_clerk();
  65. const ROUNDS: usize = 3;
  66. for _ in 0..ROUNDS {
  67. // Network partition thread.
  68. let partition_stop = Arc::new(AtomicBool::new(false));
  69. // KV server clients.
  70. let clients_stop = Arc::new(AtomicBool::new(false));
  71. let config = cfg.clone();
  72. let clients_stop_clone = clients_stop.clone();
  73. let spawn_client_results = std::thread::spawn(move || {
  74. spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
  75. appending_client(index, clerk, clients_stop_clone.clone())
  76. })
  77. });
  78. std::thread::sleep(Duration::from_secs(5));
  79. // Stop partitions.
  80. partition_stop.store(true, Ordering::Release);
  81. // Tell all clients to stop.
  82. clients_stop.store(true, Ordering::Release);
  83. let client_results = spawn_client_results
  84. .join()
  85. .expect("Spawning clients should never fail.");
  86. for (index, client_result) in client_results.into_iter().enumerate() {
  87. let (op_count, last_result) =
  88. client_result.join().expect("Client should never fail.");
  89. let real_result = clerk
  90. .get(index.to_string())
  91. .expect(&format!("Key {} should exist.", index));
  92. assert_eq!(real_result, last_result);
  93. assert!(
  94. op_count > 10,
  95. "Client committed only {} operations",
  96. op_count
  97. );
  98. }
  99. }
  100. cfg.end();
  101. }
  102. #[test]
  103. fn basic_service() {
  104. generic_test(1, false, 0);
  105. }
  106. #[test]
  107. fn concurrent_client() {
  108. generic_test(5, false, 0);
  109. }
  110. #[test]
  111. fn unreliable() {
  112. generic_test(5, true, 0);
  113. }