Prechádzať zdrojové kódy

Implement check_log_size() in kvraft test setup.

Jing Yang 4 rokov pred
rodič
commit
6cf983ffd5

+ 1 - 1
kvraft/src/snapshot_holder.rs

@@ -25,7 +25,7 @@ impl<T> SnapshotHolder<T> {
                 Err(pos) => {
                     assert!(pos == 0 || requests[pos - 1].0 < min_index);
                     assert!(
-                        pos == requests.len() - 1
+                        pos + 1 >= requests.len()
                             || requests[pos + 1].0 > min_index
                     );
                     let condvar = Arc::new(Condvar::new());

+ 23 - 2
kvraft/src/testing_utils/config.rs

@@ -7,7 +7,7 @@ use rand::seq::SliceRandom;
 use rand::thread_rng;
 
 use ruaft::rpcs::register_server;
-use ruaft::RpcClient;
+use ruaft::{Persister, RpcClient};
 
 use crate::client::Clerk;
 use crate::server::KVServer;
@@ -128,7 +128,7 @@ impl Config {
         }
     }
 
-    fn network_partition(&self, part_one: &[usize], part_two: &[usize]) {
+    pub fn network_partition(&self, part_one: &[usize], part_two: &[usize]) {
         let mut network = self.network.lock();
         Self::set_connect(&mut network, part_one, part_two, false);
         Self::set_connect(&mut network, part_two, part_one, false);
@@ -250,6 +250,27 @@ impl Config {
     }
 }
 
+impl Config {
+    pub fn check_log_size(&self, upper: usize) -> Result<()> {
+        let mut over_limits = String::new();
+        for (index, p) in self.storage.lock().all().iter().enumerate() {
+            if p.state_size() > upper {
+                let str =
+                    format!(" (index {}, size {})", index, p.state_size());
+                over_limits.push_str(&str);
+            }
+        }
+        if over_limits.len() != 0 {
+            anyhow::bail!(
+                "logs were not trimmed to {}: {}",
+                upper,
+                over_limits
+            );
+        }
+        Ok(())
+    }
+}
+
 pub fn make_config(
     server_count: usize,
     unreliable: bool,

+ 4 - 0
kvraft/src/testing_utils/memory_persister.rs

@@ -74,4 +74,8 @@ impl MemoryStorage {
         self.state_vec[index] = persister.clone();
         persister
     }
+
+    pub fn all(&self) -> &Vec<Arc<MemoryPersister>> {
+        &self.state_vec
+    }
 }

+ 32 - 0
tests/snapshot_tests.rs

@@ -29,4 +29,36 @@ fn install_snapshot_rpc() {
         sleep_election_timeouts(1);
         clerk.put("b", "B");
     }
+
+    cfg.check_log_size(MAX_RAFT_STATE * 2).expect("Hi");
+
+    // Swap majority and minority.
+    let (mut majority, mut minority) = (minority, majority);
+    majority.push(
+        minority
+            .pop()
+            .expect("There should be at least one server in the majority."),
+    );
+    cfg.network_partition(&majority, &minority);
+
+    {
+        let mut clerk = cfg.make_limited_clerk(&majority);
+        clerk.put("c", "C");
+        clerk.put("d", "D");
+        assert_eq!(clerk.get(KEY), Some("A".to_owned()));
+        assert_eq!(clerk.get("b"), Some("B".to_owned()));
+        assert_eq!(clerk.get("c"), Some("C".to_owned()));
+        assert_eq!(clerk.get("d"), Some("D".to_owned()));
+        assert_eq!(clerk.get("1"), Some("1".to_owned()));
+        assert_eq!(clerk.get("49"), Some("49".to_owned()));
+    }
+
+    cfg.connect_all();
+    clerk.put("e", "E");
+    assert_eq!(clerk.get("c"), Some("C".to_owned()));
+    assert_eq!(clerk.get("e"), Some("E".to_owned()));
+    assert_eq!(clerk.get("1"), Some("1".to_owned()));
+    assert_eq!(clerk.get("49"), Some("49".to_owned()));
+
+    cfg.end();
 }