فهرست منبع

Initial election test skeleton.

Jing Yang 5 سال پیش
والد
کامیت
3975600a33
4فایلهای تغییر یافته به همراه180 افزوده شده و 6 حذف شده
  1. 3 3
      src/lib.rs
  2. 7 3
      src/rpcs.rs
  3. 135 0
      tests/config/mod.rs
  4. 35 0
      tests/election_tests.rs

+ 3 - 3
src/lib.rs

@@ -18,7 +18,7 @@ use crate::utils::{retry_rpc, DropGuard};
 use crossbeam_utils::sync::WaitGroup;
 
 pub mod rpcs;
-mod utils;
+pub mod utils;
 
 #[derive(Eq, PartialEq)]
 enum State {
@@ -514,14 +514,14 @@ impl Raft {
     fn schedule_heartbeats(&self, interval: Duration) {
         for (peer_index, rpc_client) in self.peers.iter().enumerate() {
             if peer_index != self.me.0 {
-                // Interval and rf are now owned by the outer async function.
-                let mut interval = tokio::time::interval(interval);
+                // rf is now owned by the outer async function.
                 let rf = self.inner_state.clone();
                 // RPC client must be cloned into the outer async function.
                 let rpc_client = rpc_client.clone();
                 // Shutdown signal.
                 let keep_running = self.keep_running.clone();
                 self.thread_pool.spawn(async move {
+                    let mut interval = tokio::time::interval(interval);
                     while keep_running.load(Ordering::SeqCst) {
                         interval.tick().await;
                         if let Some(args) = Self::build_heartbeat(&rf) {

+ 7 - 3
src/rpcs.rs

@@ -47,6 +47,10 @@ pub(crate) const APPEND_ENTRIES_RPC: &str = "Raft.AppendEntries";
 pub struct RpcClient(Client);
 
 impl RpcClient {
+    pub fn new(client: Client) -> Self {
+        Self(client)
+    }
+
     pub(crate) async fn call_request_vote(
         self: Self,
         request: RequestVoteArgs,
@@ -79,10 +83,10 @@ impl RpcClient {
     }
 }
 
-pub(crate) fn register_server<S: AsRef<str>>(
+pub fn register_server<S: AsRef<str>>(
     raft: Arc<Raft>,
     name: S,
-    network: Arc<Mutex<Network>>,
+    network: &Mutex<Network>,
 ) -> std::io::Result<()> {
     let mut network =
         network.lock().expect("Network lock should not be poisoned");
@@ -127,7 +131,7 @@ mod tests {
                 0,
                 |_, _| {},
             ));
-            register_server(raft, name, network.clone())?;
+            register_server(raft, name, network.as_ref())?;
             client
         };
 

+ 135 - 0
tests/config/mod.rs

@@ -0,0 +1,135 @@
+extern crate labrpc;
+
+use parking_lot::Mutex;
+use ruaft::rpcs::register_server;
+use ruaft::{Raft, RpcClient};
+use std::sync::Arc;
+
+struct ConfigState {
+    rafts: Vec<Option<Raft>>,
+}
+
+pub struct Config {
+    network: Arc<std::sync::Mutex<labrpc::Network>>,
+    server_count: usize,
+    state: Mutex<ConfigState>,
+}
+
+impl Config {
+    fn server_name(i: usize) -> String {
+        format!("ruaft-server-{}", i)
+    }
+
+    fn client_name(client: usize, server: usize) -> String {
+        format!("ruaft-server-{}-to-{}", client, server)
+    }
+
+    pub fn begin<S: std::fmt::Display>(&self, msg: S) {
+        eprintln!("{}", msg);
+    }
+
+    pub fn check_one_leader(&self) -> std::io::Result<()> {
+        Ok(())
+    }
+
+    pub fn check_terms(&self) -> std::io::Result<()> {
+        Ok(())
+    }
+
+    pub fn disconnect(&self, index: usize) {
+        let mut network = unlock(&self.network);
+        network.remove_server(&Self::server_name(index));
+
+        // Outgoing clients.
+        for j in 0..self.server_count {
+            network.set_enable_client(Self::client_name(index, j), false)
+        }
+
+        // Incoming clients.
+        for j in 0..self.server_count {
+            network.set_enable_client(Self::client_name(j, index), false);
+        }
+    }
+
+    pub fn crash1(&mut self, index: usize) {
+        self.disconnect(index);
+        unlock(self.network.as_ref()).remove_server(Self::server_name(index));
+        let raft = {
+            let mut state = self.state.lock();
+            state.rafts[index].take()
+        };
+        if let Some(raft) = raft {
+            raft.kill();
+        }
+    }
+
+    pub fn start1(&mut self, index: usize) -> std::io::Result<()> {
+        if self.state.lock().rafts[index].is_some() {
+            self.crash1(index);
+        }
+
+        let mut clients = vec![];
+        {
+            let mut network = unlock(&self.network);
+            for j in 0..self.server_count {
+                clients.push(RpcClient::new(network.make_client(
+                    Self::client_name(index, j),
+                    Self::server_name(j),
+                )))
+            }
+        }
+
+        let raft = Raft::new(clients, index, |_, _| {});
+        self.state.lock().rafts[index].replace(raft.clone());
+
+        let raft = Arc::new(raft);
+        register_server(raft, Self::server_name(index), self.network.as_ref())
+    }
+
+    pub fn end(&self) {}
+
+    pub fn cleanup(&self) {
+        for raft in &mut self.state.lock().rafts {
+            if let Some(raft) = raft.take() {
+                raft.kill();
+            }
+        }
+    }
+}
+
+fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
+    locked.lock().expect("Unlocking network should not fail")
+}
+
+pub fn make_config(server_count: usize, unreliable: bool) -> Config {
+    let network = labrpc::Network::run_daemon();
+    {
+        let mut unlocked_network = unlock(&network);
+        unlocked_network.set_reliable(!unreliable);
+        unlocked_network.set_long_delays(true);
+    }
+
+    let state = Mutex::new(ConfigState {
+        rafts: vec![None; server_count],
+    });
+    let mut cfg = Config {
+        network,
+        server_count,
+        state,
+    };
+
+    for i in 0..server_count {
+        cfg.start1(i).expect("Starting server should not fail");
+    }
+
+    cfg
+}
+
+pub fn sleep_millis(mills: u64) {
+    std::thread::sleep(std::time::Duration::from_millis(mills))
+}
+
+const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
+pub fn sleep_election_timeouts(count: u64) {
+    sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
+}

+ 35 - 0
tests/election_tests.rs

@@ -0,0 +1,35 @@
+extern crate ruaft;
+
+mod config;
+
+#[test]
+fn initial_election() -> std::io::Result<()> {
+    const SERVERS: usize = 3;
+    let cfg = config::make_config(SERVERS, false);
+    let guard = ruaft::utils::DropGuard::new(|| cfg.cleanup());
+
+    cfg.begin("Test (2A): initial election");
+
+    cfg.check_one_leader()?;
+
+    config::sleep_millis(50);
+
+    let first_term = cfg.check_terms()?;
+    config::sleep_election_timeouts(2);
+
+    let second_term = cfg.check_terms()?;
+
+    if first_term != second_term {
+        eprintln!("Warning: term change even though there were no failures");
+    }
+
+    cfg.check_one_leader()?;
+
+    cfg.end();
+
+    drop(guard);
+    Ok(())
+}
+
+#[test]
+fn re_election() {}