Jing Yang 5 лет назад
Родитель
Сommit
fffc60fd89
1 измененных файлов с 133 добавлено и 1 удалено
  1. 133 1
      tests/persist_tests.rs

+ 133 - 1
tests/persist_tests.rs

@@ -4,9 +4,12 @@ extern crate bytes;
 extern crate labrpc;
 extern crate ruaft;
 
-use rand::{thread_rng, Rng};
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
 use std::sync::Arc;
 
+use rand::{thread_rng, Rng};
+
 mod config;
 
 #[test]
@@ -306,3 +309,132 @@ fn figure8_unreliable() -> config::Result<()> {
     drop(_guard);
     Ok(())
 }
+
+fn internal_churn(unreliable: bool) -> config::Result<()> {
+    const SERVERS: usize = 5;
+    let cfg = Arc::new(config::make_config(SERVERS, false));
+    let cfg_clone = cfg.clone();
+    let _guard = cfg_clone.deferred_cleanup();
+
+    if unreliable {
+        cfg.begin("Test (2C): unreliable churn");
+    } else {
+        cfg.begin("Test (2C): churn");
+    }
+
+    let stop = Arc::new(AtomicBool::new(false));
+    let mut handles = vec![];
+    for client_index in 0..3 {
+        let stop = stop.clone();
+        let cfg = cfg.clone();
+        let handle = std::thread::spawn(move || {
+            let mut cmds = vec![];
+            while !stop.load(Ordering::SeqCst) {
+                let cmd = thread_rng().gen();
+                let mut index = None;
+                for i in 0..SERVERS {
+                    if cfg.is_server_alive(i) {
+                        let start = cfg.leader_start(i, cmd);
+                        if start.is_some() {
+                            index = Some(i);
+                        }
+                    }
+                }
+
+                if let Some(index) = index {
+                    for millis in [10, 20, 50, 100, 200].iter() {
+                        let (cmd_index, cmd_committed) =
+                            // somehow the compiler cannot infer the error type.
+                            match cfg.committed_count(index) {
+                                Ok(t) => t,
+                                Err(e) => return Err(e),
+                            };
+                        if cmd_index > 0 {
+                            if cmd_committed == cmd {
+                                cmds.push(cmd);
+                            }
+                            // The contract we started might not get
+                        }
+                        config::sleep_millis(*millis);
+                    }
+                } else {
+                    config::sleep_millis(79 + client_index * 17);
+                }
+            }
+
+            Ok(cmds)
+        });
+        handles.push(handle);
+    }
+
+    for _ in 0..20 {
+        if thread_rng().gen_ratio(200, 1000) {
+            cfg.disconnect(thread_rng().gen_range(0, SERVERS));
+        }
+        if thread_rng().gen_ratio(500, 1000) {
+            let server = thread_rng().gen_range(0, SERVERS);
+            if !cfg.is_server_alive(server) {
+                cfg.start1(server)?;
+            }
+            cfg.connect(server);
+        }
+
+        if thread_rng().gen_ratio(200, 1000) {
+            let server = thread_rng().gen_range(0, SERVERS);
+            if cfg.is_server_alive(server) {
+                cfg.crash1(server);
+            }
+        }
+        config::sleep_millis(config::LONG_ELECTION_TIMEOUT_MILLIS / 10 * 7);
+    }
+
+    config::sleep_election_timeouts(1);
+    cfg.set_unreliable(false);
+    for i in 0..SERVERS {
+        if !cfg.is_server_alive(i) {
+            cfg.start1(i)?;
+        }
+        cfg.connect(i);
+    }
+
+    stop.store(true, Ordering::SeqCst);
+    let mut all_cmds = vec![];
+    for handle in handles {
+        let mut cmds = handle.join().expect("Client should not fail")?;
+        all_cmds.append(&mut cmds);
+    }
+
+    config::sleep_election_timeouts(1);
+
+    let last_cmd_index = cfg.one(thread_rng().gen(), SERVERS, true)?;
+    let mut consented = vec![];
+    for cmd_index in 1..last_cmd_index + 1 {
+        let cmd = cfg.wait(cmd_index, SERVERS, None)?;
+        let cmd = cmd.expect("There should always be a command");
+        consented.push(cmd);
+    }
+
+    for cmd in all_cmds {
+        assert!(
+            consented.contains(&cmd),
+            "Cmd {} not found in {:?}",
+            cmd,
+            consented
+        );
+    }
+
+    cfg.end();
+
+    drop(_guard);
+    Ok(())
+}
+
+#[test]
+fn reliable_churn() -> config::Result<()> {
+    internal_churn(false)
+}
+
+#[test]
+fn unreliable_churn() -> config::Result<()> {
+    internal_churn(true)
+}