Просмотр исходного кода

Check one leader fails at initial election test.

Jing Yang 5 лет назад
Родитель
Сommit
d637710470
4 измененных файлов с 73 добавлено и 14 удалено
  1. 1 0
      Cargo.toml
  2. 13 5
      src/lib.rs
  3. 55 8
      tests/config/mod.rs
  4. 4 1
      tests/election_tests.rs

+ 1 - 0
Cargo.toml

@@ -18,4 +18,5 @@ serde_derive = "1.0.116"
 tokio = { version = "0.2.22", features = ["rt-threaded", "sync", "time"] }
 
 [dev-dependencies]
+anyhow = "1.0"
 futures = { version = "0.3.5", features = ["thread-pool"] }

+ 13 - 5
src/lib.rs

@@ -31,7 +31,7 @@ enum State {
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
 )]
-pub struct Term(usize);
+pub struct Term(pub usize);
 #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
 struct Peer(usize);
 
@@ -541,7 +541,7 @@ impl Raft {
     ) -> Option<AppendEntriesArgs> {
         let rf = rf.lock();
 
-        if rf.state == State::Leader {
+        if rf.is_leader() {
             return None;
         }
 
@@ -620,8 +620,7 @@ impl Raft {
                     rf.next_index[peer_index] = match_index + 1;
                     if match_index > rf.match_index[peer_index] {
                         rf.match_index[peer_index] = match_index;
-                        if rf.state == State::Leader && rf.current_term == term
-                        {
+                        if rf.is_leader() && rf.current_term == term {
                             let mut matched = rf.match_index.to_vec();
                             let mid = matched.len() / 2 + 1;
                             matched.sort();
@@ -749,7 +748,7 @@ impl Raft {
     pub fn start(&self, command: Command) -> Option<(Term, Index)> {
         let mut rf = self.inner_state.lock();
         let term = rf.current_term;
-        if rf.state != State::Leader {
+        if !rf.is_leader() {
             return None;
         }
 
@@ -785,6 +784,11 @@ impl Raft {
             ));
         self.inner_state.lock().persist();
     }
+
+    pub fn get_state(&self) -> (Term, bool) {
+        let state = self.inner_state.lock();
+        (state.current_term, state.is_leader())
+    }
 }
 
 impl RaftState {
@@ -801,6 +805,10 @@ impl RaftState {
         assert!(len > 0, "There should always be at least one entry in log");
         (len - 1, self.log.last().unwrap().term)
     }
+
+    fn is_leader(&self) -> bool {
+        self.state == State::Leader
+    }
 }
 
 const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;

+ 55 - 8
tests/config/mod.rs

@@ -1,12 +1,13 @@
-extern crate labrpc;
-
 use parking_lot::Mutex;
+use rand::{thread_rng, Rng};
 use ruaft::rpcs::register_server;
 use ruaft::{Raft, RpcClient};
+use std::collections::HashMap;
 use std::sync::Arc;
 
 struct ConfigState {
     rafts: Vec<Option<Raft>>,
+    connected: Vec<bool>,
 }
 
 pub struct Config {
@@ -15,6 +16,8 @@ pub struct Config {
     state: Mutex<ConfigState>,
 }
 
+pub use anyhow::Result;
+
 impl Config {
     fn server_name(i: usize) -> String {
         format!("ruaft-server-{}", i)
@@ -28,31 +31,74 @@ impl Config {
         eprintln!("{}", msg);
     }
 
-    pub fn check_one_leader(&self) -> std::io::Result<()> {
-        Ok(())
+    pub fn check_one_leader(&self) -> Result<usize> {
+        for _ in 0..10 {
+            let millis = 450 + thread_rng().gen_range(0, 100);
+            sleep_millis(millis);
+
+            let mut leaders = HashMap::new();
+            let state = self.state.lock();
+            for i in 0..self.server_count {
+                if state.connected[i] {
+                    if let Some(raft) = &state.rafts[i] {
+                        let (term, is_leader) = raft.get_state();
+                        if is_leader {
+                            leaders.entry(term.0).or_insert(vec![]).push(i)
+                        }
+                    }
+                }
+            }
+
+            let mut last_term_with_leader = 0;
+            let mut last_leader = 0;
+            for (term, leaders) in leaders {
+                if leaders.len() > 1 {
+                    bail!("term {} has {} (>1) leaders", term, leaders.len());
+                }
+                if term > last_term_with_leader {
+                    last_term_with_leader = term;
+                    last_leader = leaders[0];
+                }
+            }
+
+            if last_term_with_leader != 0 {
+                return Ok(last_leader);
+            }
+        }
+        Err(anyhow!("expected one leader, got none"))
     }
 
     pub fn check_terms(&self) -> std::io::Result<()> {
         Ok(())
     }
 
+    pub fn connect(&self, index: usize) {
+        self.set_connect(index, true);
+    }
+
     pub fn disconnect(&self, index: usize) {
+        self.set_connect(index, false);
+    }
+
+    pub fn set_connect(&self, index: usize, yes: bool) {
+        self.state.lock().connected[index] = yes;
+
         let mut network = unlock(&self.network);
-        network.remove_server(&Self::server_name(index));
 
         // Outgoing clients.
         for j in 0..self.server_count {
-            network.set_enable_client(Self::client_name(index, j), false)
+            network.set_enable_client(Self::client_name(index, j), yes)
         }
 
         // Incoming clients.
         for j in 0..self.server_count {
-            network.set_enable_client(Self::client_name(j, index), false);
+            network.set_enable_client(Self::client_name(j, index), yes);
         }
     }
 
     pub fn crash1(&mut self, index: usize) {
         self.disconnect(index);
+
         unlock(self.network.as_ref()).remove_server(Self::server_name(index));
         let raft = {
             let mut state = self.state.lock();
@@ -90,7 +136,7 @@ impl Config {
 
     pub fn cleanup(&self) {
         for raft in &mut self.state.lock().rafts {
-            if let Some(raft) = raft.take() {
+            if let Some(_raft) = raft.take() {
                 raft.kill();
             }
         }
@@ -111,6 +157,7 @@ pub fn make_config(server_count: usize, unreliable: bool) -> Config {
 
     let state = Mutex::new(ConfigState {
         rafts: vec![None; server_count],
+        connected: vec![true; server_count],
     });
     let mut cfg = Config {
         network,

+ 4 - 1
tests/election_tests.rs

@@ -1,9 +1,12 @@
+extern crate labrpc;
 extern crate ruaft;
+#[macro_use]
+extern crate anyhow;
 
 mod config;
 
 #[test]
-fn initial_election() -> std::io::Result<()> {
+fn initial_election() -> config::Result<()> {
     const SERVERS: usize = 3;
     let cfg = config::make_config(SERVERS, false);
     let guard = ruaft::utils::DropGuard::new(|| cfg.cleanup());