| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- #[macro_use]
- extern crate anyhow;
- extern crate kvraft;
- extern crate rand;
- #[macro_use]
- extern crate scopeguard;
- use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
- use std::sync::Arc;
- use std::thread::JoinHandle;
- use std::time::Duration;
- use anyhow::Context;
- use rand::{thread_rng, Rng};
- use kvraft::testing_utils::config::{
- make_config, sleep_election_timeouts, sleep_millis, Config,
- LONG_ELECTION_TIMEOUT_MILLIS,
- };
- use kvraft::Clerk;
- fn spawn_clients<T, Func>(
- config: Arc<Config>,
- clients: usize,
- func: Func,
- ) -> Vec<JoinHandle<T>>
- where
- T: 'static + Send,
- Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
- {
- let mut client_threads = vec![];
- for i in 0..clients {
- let clerk = config.make_clerk();
- let func = func.clone();
- client_threads.push(std::thread::spawn(move || func(i, clerk)))
- }
- eprintln!("spawning clients done.");
- client_threads
- }
- fn appending_client(
- index: usize,
- mut clerk: Clerk,
- stop: Arc<AtomicBool>,
- ) -> (usize, String) {
- eprintln!("client {} running.", index);
- let mut op_count = 0usize;
- let key = index.to_string();
- let mut last = String::new();
- let mut rng = thread_rng();
- clerk.put(&key, &last);
- while !stop.load(Ordering::Acquire) {
- eprintln!("client {} starting {}.", index, op_count);
- if rng.gen_ratio(1, 2) {
- let value = format!("({}, {}), ", index, op_count);
- last.push_str(&value);
- clerk.append(&key, &value);
- op_count += 1;
- } else {
- let value = clerk
- .get(&key)
- .expect(&format!("Key {} should exist.", index));
- assert_eq!(value, last);
- }
- eprintln!("client {} done {}.", index, op_count);
- }
- eprintln!("client {} done.", index);
- (op_count, last)
- }
- const PARTITION_MAX_DELAY_MILLIS: u64 = 200;
- fn run_partition(cfg: Arc<Config>, stop: Arc<AtomicBool>) {
- while !stop.load(Ordering::Acquire) {
- let mut indexes = cfg.shuffled_indexes();
- let len = indexes.len();
- cfg.partition(&(indexes.split_off(len / 2)), &indexes);
- let delay = thread_rng().gen_range(
- LONG_ELECTION_TIMEOUT_MILLIS
- ..LONG_ELECTION_TIMEOUT_MILLIS + PARTITION_MAX_DELAY_MILLIS,
- );
- std::thread::sleep(Duration::from_millis(delay));
- }
- }
- #[derive(Default)]
- struct GenericTestParams {
- clients: usize,
- unreliable: bool,
- partition: bool,
- maxraftstate: Option<usize>,
- }
- fn generic_test(test_params: GenericTestParams) {
- let GenericTestParams {
- clients,
- unreliable,
- partition,
- maxraftstate,
- } = test_params;
- let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
- const SERVERS: usize = 5;
- let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
- defer!(cfg.clean_up());
- cfg.begin("");
- let mut clerk = cfg.make_clerk();
- const ROUNDS: usize = 3;
- for _ in 0..ROUNDS {
- // Network partition thread.
- let partition_stop = Arc::new(AtomicBool::new(false));
- // KV server clients.
- let clients_stop = Arc::new(AtomicBool::new(false));
- let config = cfg.clone();
- let clients_stop_clone = clients_stop.clone();
- let spawn_client_results = std::thread::spawn(move || {
- spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
- appending_client(index, clerk, clients_stop_clone.clone())
- })
- });
- let partition_result = if partition {
- let config = cfg.clone();
- let partition_stop_clone = partition_stop.clone();
- Some(std::thread::spawn(|| {
- run_partition(config, partition_stop_clone)
- }))
- } else {
- None
- };
- std::thread::sleep(Duration::from_secs(5));
- // Stop partitions.
- partition_stop.store(true, Ordering::Release);
- partition_result.map(|result| {
- result.join().expect("Partition thread should never fail");
- cfg.connect_all();
- sleep_election_timeouts(1);
- });
- // Tell all clients to stop.
- clients_stop.store(true, Ordering::Release);
- let client_results = spawn_client_results
- .join()
- .expect("Spawning clients should never fail.");
- for (index, client_result) in client_results.into_iter().enumerate() {
- let (op_count, last_result) =
- client_result.join().expect("Client should never fail.");
- let real_result = clerk
- .get(index.to_string())
- .expect(&format!("Key {} should exist.", index));
- assert_eq!(real_result, last_result);
- assert!(
- op_count > 10,
- "Client committed only {} operations",
- op_count
- );
- }
- }
- cfg.end();
- }
- fn check_concurrent_results(
- value: String,
- clients: usize,
- expected: Vec<usize>,
- ) -> anyhow::Result<()> {
- if !value.starts_with('(') || !value.ends_with(')') {
- bail!("Malformed value string {}", value)
- }
- let inner_value = &value[1..value.len() - 1];
- let mut progress = vec![0; clients];
- for pair_str in inner_value.split(")(") {
- let mut nums = vec![];
- for num_str in pair_str.split(", ") {
- let num: usize = num_str.parse().context(format!(
- "Parsing '{:?}' failed within '{:?}'",
- num_str, value,
- ))?;
- nums.push(num);
- }
- if nums.len() != 2 {
- bail!(
- concat!(
- "More than two numbers in the same group when",
- " parsing '{:?}' failed within '{:?}'",
- ),
- pair_str,
- value,
- );
- }
- let (client, curr) = (nums[0], nums[1]);
- if progress[client] != curr {
- bail!(
- "Client {} failed, expecting {}, got {}, others are {:?} in {}",
- client,
- progress[client],
- curr,
- progress,
- value,
- )
- }
- progress[client] = curr + 1;
- }
- assert_eq!(progress, expected, "Expecting progress in {}", value);
- Ok(())
- }
- #[test]
- fn basic_service() {
- generic_test(GenericTestParams {
- clients: 1,
- ..Default::default()
- });
- }
- #[test]
- fn concurrent_client() {
- generic_test(GenericTestParams {
- clients: 5,
- ..Default::default()
- });
- }
- #[test]
- fn unreliable_many_clients() {
- generic_test(GenericTestParams {
- clients: 5,
- unreliable: true,
- ..Default::default()
- });
- }
- #[test]
- fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
- const SERVERS: usize = 5;
- let cfg = Arc::new(make_config(SERVERS, true, 0));
- defer!(cfg.clean_up());
- let mut clerk = cfg.make_clerk();
- cfg.begin("Test: concurrent append to same key, unreliable (3A)");
- clerk.put("k", "");
- const CLIENTS: usize = 5;
- const ATTEMPTS: usize = 10;
- let client_results =
- spawn_clients(cfg.clone(), CLIENTS, |index, mut clerk| {
- for i in 0..ATTEMPTS {
- clerk.append("k", format!("({}, {})", index, i));
- }
- });
- for client_result in client_results {
- client_result.join().expect("Client should never fail");
- }
- let value = clerk.get("k").expect("Key should exist");
- check_concurrent_results(value, CLIENTS, vec![ATTEMPTS; CLIENTS])
- }
- #[test]
- fn one_partition() -> anyhow::Result<()> {
- const SERVERS: usize = 5;
- let cfg = Arc::new(make_config(SERVERS, false, 0));
- defer!(cfg.clean_up());
- cfg.begin("Test: progress in majority (3A)");
- const KEY: &str = "1";
- let mut clerk = cfg.make_clerk();
- clerk.put(KEY, "13");
- let (majority, minority) = cfg.make_partition();
- assert!(minority.len() < majority.len());
- assert_eq!(minority.len() + majority.len(), SERVERS);
- cfg.partition(&majority, &minority);
- let mut clerk_majority = cfg.make_limited_clerk(&majority);
- let mut clerk_minority1 = cfg.make_limited_clerk(&minority);
- let mut clerk_minority2 = cfg.make_limited_clerk(&minority);
- clerk_majority.put(KEY, "14");
- assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
- cfg.begin("Test: no progress in minority (3A)");
- let counter = Arc::new(AtomicUsize::new(0));
- let counter1 = counter.clone();
- std::thread::spawn(move || {
- clerk_minority1.put(KEY, "15");
- counter1.fetch_or(1, Ordering::SeqCst);
- });
- let counter2 = counter.clone();
- std::thread::spawn(move || {
- clerk_minority2.get(KEY);
- counter2.fetch_or(2, Ordering::SeqCst);
- });
- sleep_millis(1000);
- assert_eq!(counter.load(Ordering::SeqCst), 0);
- assert_eq!(clerk_majority.get(KEY), Some("14".to_owned()));
- clerk_majority.put(KEY, "16");
- assert_eq!(clerk_majority.get(KEY), Some("16".to_owned()));
- cfg.begin("Test: completion after heal (3A)");
- cfg.connect_all();
- cfg.connect_all_clerks();
- sleep_election_timeouts(1);
- for _ in 0..100 {
- sleep_millis(60);
- if counter.load(Ordering::SeqCst) == 3 {
- break;
- }
- }
- assert_eq!(counter.load(Ordering::SeqCst), 3);
- assert_eq!(clerk.get(KEY), Some("15".to_owned()));
- Ok(())
- }
- #[test]
- fn many_partitions_one_client() {
- generic_test(GenericTestParams {
- clients: 1,
- partition: true,
- ..Default::default()
- });
- }
- #[test]
- fn many_partitions_many_client() {
- generic_test(GenericTestParams {
- clients: 5,
- partition: true,
- ..Default::default()
- });
- }
|