Jelajahi Sumber

Add the RPC count test.

Jing Yang 5 tahun lalu
induk
melakukan
ecc3ec8d46
2 mengubah file dengan 159 tambahan dan 6 penghapusan
  1. 104 0
      tests/agreement_tests.rs
  2. 55 6
      tests/config/mod.rs

+ 104 - 0
tests/agreement_tests.rs

@@ -236,3 +236,107 @@ fn backup() -> config::Result<()> {
     drop(_guard);
     Ok(())
 }
+
+#[test]
+fn count() -> config::Result<()> {
+    const SERVERS: usize = 3;
+    let cfg = config::make_config(SERVERS, false);
+    let _guard = cfg.deferred_cleanup();
+
+    cfg.begin("Test (2B): RPC counts aren't too high");
+
+    cfg.check_one_leader()?;
+    let total = cfg.total_rpcs();
+    assert!(
+        total >= 1 && total <= 30,
+        "too many or few RPCs ({}) to elect initial leader",
+        total
+    );
+
+    let mut retries = 0;
+    let (success, total) = loop {
+        if retries == 5 {
+            break (false, 0);
+        }
+        if retries != 0 {
+            config::sleep_millis(3000);
+        }
+        retries += 1;
+
+        let leader = cfg.check_one_leader()?;
+        let start_total = cfg.total_rpcs();
+
+        const ITERS: usize = 10;
+        let (term, start_index) = match cfg.leader_start(leader, 1) {
+            Some(pair) => pair,
+            None => continue,
+        };
+
+        let mut cmds = vec![];
+        for i in 1..(ITERS + 2) {
+            let cmd: i32 = thread_rng().gen();
+            cmds.push(cmd);
+
+            let index = match cfg.leader_start(leader, cmd) {
+                Some((new_term, index)) => {
+                    if new_term == term {
+                        Some(index)
+                    } else {
+                        None
+                    }
+                }
+                None => None,
+            };
+            if let Some(index) = index {
+                assert_eq!(start_index + i, index, "start() failed");
+            } else {
+                retries = 0;
+                break;
+            }
+        }
+        if retries == 0 {
+            continue;
+        }
+        for i in 1..(ITERS + 1) {
+            let cmd = cfg.wait(start_index + i, SERVERS, Some(term))?;
+            if let Some(cmd) = cmd {
+                assert_eq!(
+                    cmd,
+                    cmds[i - 1],
+                    "wrong value {} committed for index {}; expected {:?}",
+                    cmd,
+                    start_index + i,
+                    cmds
+                )
+            } else {
+                retries = 0;
+                break;
+            }
+        }
+        if term != cfg.check_terms()?.expect("terms should be agreed on") {
+            retries = 0;
+        }
+        if retries == 0 {
+            continue;
+        }
+
+        let diff = cfg.total_rpcs() - start_total;
+        if diff > (ITERS + 1 + 3) * 3 {
+            panic!("too many RPCs ({}) for {} entries", diff, ITERS);
+        }
+
+        break (true, cfg.total_rpcs());
+    };
+
+    assert!(success, "term change too often");
+
+    config::sleep_election_timeouts(1);
+
+    let diff = cfg.total_rpcs() - total;
+    assert!(diff < 3 * 20, "too many RPCs ({}) for 1 second of idleness");
+
+    cfg.end();
+
+    drop(_guard);
+    Ok(())
+}

+ 55 - 6
tests/config/mod.rs

@@ -96,7 +96,7 @@ impl Config {
         Ok(())
     }
 
-    pub fn check_terms(&self) -> Result<()> {
+    pub fn check_terms(&self) -> Result<Option<usize>> {
         let mut term = None;
         let state = self.state.lock();
         for i in 0..self.server_count {
@@ -113,7 +113,8 @@ impl Config {
                 }
             }
         }
-        Ok(())
+        // Unwrap type Term into usize.
+        Ok(term.map(|term| term.0))
     }
 
     /// Returns the number of peers that committed at least `index` commands,
@@ -128,11 +129,12 @@ impl Config {
             }
             if log.committed_logs[i].len() > index {
                 let command = log.committed_logs[i][index];
-                if count > 0 {
-                    assert_eq!(
-                        command, cmd,
+                if count > 0 && command != cmd {
+                    bail!(
                         "committed values do not match: index {}, {}, {}",
-                        index, cmd, command
+                        index,
+                        cmd,
+                        command
                     )
                 }
                 count += 1;
@@ -142,6 +144,48 @@ impl Config {
         Ok((count, cmd))
     }
 
+    pub fn wait(
+        &self,
+        index: usize,
+        min_count: usize,
+        at_term: Option<usize>,
+    ) -> Result<Option<i32>> {
+        let mut sleep_time_mills = 10;
+        for _ in 0..30 {
+            let (count, _) = self.committed_count(index)?;
+            if count >= min_count {
+                break;
+            }
+            sleep_millis(sleep_time_mills);
+            if sleep_time_mills < 1000 {
+                sleep_time_mills <<= 1;
+            }
+
+            if let Some(at_term) = at_term {
+                let state = self.state.lock();
+                for raft in &state.rafts {
+                    if let Some(raft) = raft {
+                        let (term, _) = raft.get_state();
+                        if term.0 > at_term {
+                            return Ok(None);
+                        }
+                    }
+                }
+            }
+        }
+
+        let (count, cmd) = self.committed_count(index)?;
+        if count < min_count {
+            bail!(
+                "only {} decided for index {}; wanted {}",
+                count,
+                index,
+                min_count
+            )
+        }
+        Ok(Some(cmd))
+    }
+
     pub fn one(
         &self,
         cmd: i32,
@@ -259,6 +303,7 @@ impl Config {
         Ok(())
     }
 
+    /// Start a new command, returns (term, index).
     pub fn leader_start(
         &self,
         leader: usize,
@@ -273,6 +318,10 @@ impl Config {
             .unwrap()
     }
 
+    pub fn total_rpcs(&self) -> usize {
+        unlock(&self.network).get_total_rpc_count()
+    }
+
     pub fn end(&self) {}
 
     pub fn cleanup(&self) {