فهرست منبع

Move generic_test to testing_utils.

Jing Yang 4 سال پیش
والد
کامیت
027859bd71
5فایلهای تغییر یافته به همراه176 افزوده شده و 162 حذف شده
  1. 1 0
      kvraft/Cargo.toml
  2. 2 0
      kvraft/src/lib.rs
  3. 167 0
      kvraft/src/testing_utils/generic_test.rs
  4. 1 0
      kvraft/src/testing_utils/mod.rs
  5. 5 162
      kvraft/tests/service_test.rs

+ 1 - 0
kvraft/Cargo.toml

@@ -11,6 +11,7 @@ labrpc = { path = "../../labrpc" }
 parking_lot = "0.11.1"
 rand = "0.8"
 ruaft = { path = "../" }
+scopeguard = "1.1.0"
 serde = "1.0.116"
 serde_derive = "1.0.116"
 tokio = { version = "1.0", features = ["rt-multi-thread", "time", "parking_lot"] }

+ 2 - 0
kvraft/src/lib.rs

@@ -3,6 +3,8 @@ extern crate labrpc;
 extern crate parking_lot;
 extern crate rand;
 extern crate ruaft;
+#[macro_use]
+extern crate scopeguard;
 extern crate serde;
 #[macro_use]
 extern crate serde_derive;

+ 167 - 0
kvraft/src/testing_utils/generic_test.rs

@@ -0,0 +1,167 @@
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread::JoinHandle;
+use std::time::Duration;
+
+use rand::{thread_rng, Rng};
+
+use crate::testing_utils::config::{
+    make_config, sleep_election_timeouts, sleep_millis, Config,
+    LONG_ELECTION_TIMEOUT_MILLIS,
+};
+use crate::Clerk;
+
+pub fn spawn_clients<T, Func>(
+    config: Arc<Config>,
+    clients: usize,
+    func: Func,
+) -> Vec<JoinHandle<T>>
+where
+    T: 'static + Send,
+    Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
+{
+    let mut client_threads = vec![];
+    for i in 0..clients {
+        let clerk = config.make_clerk();
+        let func = func.clone();
+        client_threads.push(std::thread::spawn(move || func(i, clerk)))
+    }
+    eprintln!("spawning clients done.");
+    client_threads
+}
+
+fn appending_client(
+    index: usize,
+    mut clerk: Clerk,
+    stop: Arc<AtomicBool>,
+) -> (usize, String) {
+    eprintln!("client {} running.", index);
+    let mut op_count = 0usize;
+    let key = index.to_string();
+    let mut last = String::new();
+    let mut rng = thread_rng();
+
+    clerk.put(&key, &last);
+
+    while !stop.load(Ordering::Acquire) {
+        eprintln!("client {} starting {}.", index, op_count);
+        if rng.gen_ratio(1, 2) {
+            let value = format!("({}, {}), ", index, op_count);
+
+            last.push_str(&value);
+            clerk.append(&key, &value);
+
+            op_count += 1;
+        } else {
+            let value = clerk
+                .get(&key)
+                .expect(&format!("Key {} should exist.", index));
+            assert_eq!(value, last);
+        }
+        eprintln!("client {} done {}.", index, op_count);
+    }
+    eprintln!("client {} done.", index);
+    (op_count, last)
+}
+
+const PARTITION_MAX_DELAY_MILLIS: u64 = 200;
+
+fn run_partition(cfg: Arc<Config>, stop: Arc<AtomicBool>) {
+    while !stop.load(Ordering::Acquire) {
+        cfg.random_partition();
+        let delay = thread_rng().gen_range(
+            LONG_ELECTION_TIMEOUT_MILLIS
+                ..LONG_ELECTION_TIMEOUT_MILLIS + PARTITION_MAX_DELAY_MILLIS,
+        );
+        std::thread::sleep(Duration::from_millis(delay));
+    }
+}
+
+#[derive(Default)]
+pub struct GenericTestParams {
+    pub clients: usize,
+    pub unreliable: bool,
+    pub partition: bool,
+    pub crash: bool,
+    pub maxraftstate: Option<usize>,
+}
+
+pub fn generic_test(test_params: GenericTestParams) {
+    let GenericTestParams {
+        clients,
+        unreliable,
+        partition,
+        crash,
+        maxraftstate,
+    } = test_params;
+    let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
+    const SERVERS: usize = 5;
+    let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
+    defer!(cfg.clean_up());
+
+    cfg.begin("");
+    let mut clerk = cfg.make_clerk();
+
+    const ROUNDS: usize = 3;
+    for _ in 0..ROUNDS {
+        // Network partition thread.
+        let partition_stop = Arc::new(AtomicBool::new(false));
+        // KV server clients.
+        let clients_stop = Arc::new(AtomicBool::new(false));
+
+        let config = cfg.clone();
+        let clients_stop_clone = clients_stop.clone();
+        let spawn_client_results = std::thread::spawn(move || {
+            spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
+                appending_client(index, clerk, clients_stop_clone.clone())
+            })
+        });
+
+        let partition_result = if partition {
+            // Let the clients perform some operations without interruption.
+            sleep_millis(1000);
+            let config = cfg.clone();
+            let partition_stop_clone = partition_stop.clone();
+            Some(std::thread::spawn(|| {
+                run_partition(config, partition_stop_clone)
+            }))
+        } else {
+            None
+        };
+
+        if crash {
+            cfg.crash_all();
+            sleep_election_timeouts(1);
+            cfg.restart_all();
+        }
+
+        std::thread::sleep(Duration::from_secs(5));
+
+        // Stop partitions.
+        partition_stop.store(true, Ordering::Release);
+        partition_result.map(|result| {
+            result.join().expect("Partition thread should never fail");
+            cfg.connect_all();
+            sleep_election_timeouts(1);
+        });
+
+        // Tell all clients to stop.
+        clients_stop.store(true, Ordering::Release);
+
+        let client_results = spawn_client_results
+            .join()
+            .expect("Spawning clients should never fail.");
+        for (index, client_result) in client_results.into_iter().enumerate() {
+            let (op_count, last_result) =
+                client_result.join().expect("Client should never fail.");
+            let real_result = clerk
+                .get(index.to_string())
+                .expect(&format!("Key {} should exist.", index));
+            assert_eq!(real_result, last_result);
+            eprintln!("Client {} committed {} operations", index, op_count);
+            assert!(op_count > 10, "Client committed less than 10 operations");
+        }
+    }
+
+    cfg.end();
+}

+ 1 - 0
kvraft/src/testing_utils/mod.rs

@@ -1,3 +1,4 @@
 pub mod config;
+pub mod generic_test;
 mod memory_persister;
 mod rpcs;

+ 5 - 162
kvraft/tests/service_test.rs

@@ -1,177 +1,20 @@
 #[macro_use]
 extern crate anyhow;
 extern crate kvraft;
-extern crate rand;
 #[macro_use]
 extern crate scopeguard;
 
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::thread::JoinHandle;
-use std::time::Duration;
 
 use anyhow::Context;
-use rand::{thread_rng, Rng};
 
 use kvraft::testing_utils::config::{
-    make_config, sleep_election_timeouts, sleep_millis, Config,
-    LONG_ELECTION_TIMEOUT_MILLIS,
+    make_config, sleep_election_timeouts, sleep_millis,
+};
+use kvraft::testing_utils::generic_test::{
+    generic_test, spawn_clients, GenericTestParams,
 };
-use kvraft::Clerk;
-
-fn spawn_clients<T, Func>(
-    config: Arc<Config>,
-    clients: usize,
-    func: Func,
-) -> Vec<JoinHandle<T>>
-where
-    T: 'static + Send,
-    Func: 'static + Clone + Send + Sync + Fn(usize, Clerk) -> T,
-{
-    let mut client_threads = vec![];
-    for i in 0..clients {
-        let clerk = config.make_clerk();
-        let func = func.clone();
-        client_threads.push(std::thread::spawn(move || func(i, clerk)))
-    }
-    eprintln!("spawning clients done.");
-    client_threads
-}
-
-fn appending_client(
-    index: usize,
-    mut clerk: Clerk,
-    stop: Arc<AtomicBool>,
-) -> (usize, String) {
-    eprintln!("client {} running.", index);
-    let mut op_count = 0usize;
-    let key = index.to_string();
-    let mut last = String::new();
-    let mut rng = thread_rng();
-
-    clerk.put(&key, &last);
-
-    while !stop.load(Ordering::Acquire) {
-        eprintln!("client {} starting {}.", index, op_count);
-        if rng.gen_ratio(1, 2) {
-            let value = format!("({}, {}), ", index, op_count);
-
-            last.push_str(&value);
-            clerk.append(&key, &value);
-
-            op_count += 1;
-        } else {
-            let value = clerk
-                .get(&key)
-                .expect(&format!("Key {} should exist.", index));
-            assert_eq!(value, last);
-        }
-        eprintln!("client {} done {}.", index, op_count);
-    }
-    eprintln!("client {} done.", index);
-    (op_count, last)
-}
-
-const PARTITION_MAX_DELAY_MILLIS: u64 = 200;
-fn run_partition(cfg: Arc<Config>, stop: Arc<AtomicBool>) {
-    while !stop.load(Ordering::Acquire) {
-        cfg.random_partition();
-        let delay = thread_rng().gen_range(
-            LONG_ELECTION_TIMEOUT_MILLIS
-                ..LONG_ELECTION_TIMEOUT_MILLIS + PARTITION_MAX_DELAY_MILLIS,
-        );
-        std::thread::sleep(Duration::from_millis(delay));
-    }
-}
-
-#[derive(Default)]
-struct GenericTestParams {
-    clients: usize,
-    unreliable: bool,
-    partition: bool,
-    crash: bool,
-    maxraftstate: Option<usize>,
-}
-
-fn generic_test(test_params: GenericTestParams) {
-    let GenericTestParams {
-        clients,
-        unreliable,
-        partition,
-        crash,
-        maxraftstate,
-    } = test_params;
-    let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
-    const SERVERS: usize = 5;
-    let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
-    defer!(cfg.clean_up());
-
-    cfg.begin("");
-    let mut clerk = cfg.make_clerk();
-
-    const ROUNDS: usize = 3;
-    for _ in 0..ROUNDS {
-        // Network partition thread.
-        let partition_stop = Arc::new(AtomicBool::new(false));
-        // KV server clients.
-        let clients_stop = Arc::new(AtomicBool::new(false));
-
-        let config = cfg.clone();
-        let clients_stop_clone = clients_stop.clone();
-        let spawn_client_results = std::thread::spawn(move || {
-            spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
-                appending_client(index, clerk, clients_stop_clone.clone())
-            })
-        });
-
-        let partition_result = if partition {
-            // Let the clients perform some operations without interruption.
-            sleep_millis(1000);
-            let config = cfg.clone();
-            let partition_stop_clone = partition_stop.clone();
-            Some(std::thread::spawn(|| {
-                run_partition(config, partition_stop_clone)
-            }))
-        } else {
-            None
-        };
-
-        if crash {
-            cfg.crash_all();
-            sleep_election_timeouts(1);
-            cfg.restart_all();
-        }
-
-        std::thread::sleep(Duration::from_secs(5));
-
-        // Stop partitions.
-        partition_stop.store(true, Ordering::Release);
-        partition_result.map(|result| {
-            result.join().expect("Partition thread should never fail");
-            cfg.connect_all();
-            sleep_election_timeouts(1);
-        });
-
-        // Tell all clients to stop.
-        clients_stop.store(true, Ordering::Release);
-
-        let client_results = spawn_client_results
-            .join()
-            .expect("Spawning clients should never fail.");
-        for (index, client_result) in client_results.into_iter().enumerate() {
-            let (op_count, last_result) =
-                client_result.join().expect("Client should never fail.");
-            let real_result = clerk
-                .get(index.to_string())
-                .expect(&format!("Key {} should exist.", index));
-            assert_eq!(real_result, last_result);
-            eprintln!("Client {} committed {} operations", index, op_count);
-            assert!(op_count > 10, "Client committed less than 10 operations");
-        }
-    }
-
-    cfg.end();
-}
 
 fn check_concurrent_results(
     value: String,