Quellcode durchsuchen

Add two lineariability tests.

Jing Yang vor 4 Jahren
Ursprung
Commit
f93a1fa8f7

+ 1 - 0
kvraft/Cargo.toml

@@ -11,6 +11,7 @@ labrpc = { path = "../../labrpc" }
 parking_lot = "0.11.1"
 rand = "0.8"
 ruaft = { path = "../" }
+linearizability = { path = "../linearizability" }
 scopeguard = "1.1.0"
 serde = "1.0.116"
 serde_derive = "1.0.116"

+ 99 - 8
kvraft/src/testing_utils/generic_test.rs

@@ -1,10 +1,13 @@
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::thread::JoinHandle;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
+use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
+use linearizability::{KvInput, KvModel, KvOp, KvOutput, Operation};
+
 use crate::testing_utils::config::{
     make_config, sleep_election_timeouts, sleep_millis, Config,
     LONG_ELECTION_TIMEOUT_MILLIS,
@@ -64,6 +67,58 @@ fn appending_client(
     (op_count, last)
 }
 
+fn linearizability_client(
+    index: usize,
+    client_count: usize,
+    mut clerk: Clerk,
+    stop: Arc<AtomicBool>,
+    ops: Arc<Mutex<Vec<Operation<KvInput, KvOutput>>>>,
+) -> (usize, String) {
+    let mut op_count = 0usize;
+    while !stop.load(Ordering::Acquire) {
+        let key = thread_rng().gen_range(0..client_count).to_string();
+        let value = format!("({}, {}), ", index, op_count);
+        let call_time = Instant::now();
+        let call_op;
+        let return_op;
+        if thread_rng().gen_ratio(500, 1000) {
+            clerk.append(&key, &value);
+            call_op = KvInput {
+                op: KvOp::Append,
+                key,
+                value,
+            };
+            return_op = KvOutput::default();
+        } else if thread_rng().gen_ratio(100, 1000) {
+            clerk.put(&key, &value);
+            call_op = KvInput {
+                op: KvOp::Put,
+                key,
+                value,
+            };
+            return_op = KvOutput::default();
+        } else {
+            let result = clerk.get(&key).unwrap_or_default();
+            call_op = KvInput {
+                op: KvOp::Get,
+                key,
+                value: Default::default(),
+            };
+            return_op = result;
+        }
+        let return_time = Instant::now();
+        ops.lock().push(Operation {
+            call_op,
+            call_time,
+            return_op,
+            return_time,
+        });
+
+        op_count += 1;
+    }
+    (op_count, String::new())
+}
+
 const PARTITION_MAX_DELAY_MILLIS: u64 = 200;
 
 fn run_partition(cfg: Arc<Config>, stop: Arc<AtomicBool>) {
@@ -85,6 +140,7 @@ pub struct GenericTestParams {
     pub crash: bool,
     pub maxraftstate: Option<usize>,
     pub min_ops: Option<usize>,
+    pub test_linearizability: bool,
 }
 
 pub fn generic_test(test_params: GenericTestParams) {
@@ -95,15 +151,17 @@ pub fn generic_test(test_params: GenericTestParams) {
         crash,
         maxraftstate,
         min_ops,
+        test_linearizability,
     } = test_params;
     let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
     let min_ops = min_ops.unwrap_or(10);
-    const SERVERS: usize = 5;
-    let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
+    let servers: usize = if test_linearizability { 7 } else { 5 };
+    let cfg = Arc::new(make_config(servers, unreliable, maxraftstate));
     defer!(cfg.clean_up());
 
     cfg.begin("");
     let mut clerk = cfg.make_clerk();
+    let ops = Arc::new(Mutex::new(vec![]));
 
     const ROUNDS: usize = 3;
     for _ in 0..ROUNDS {
@@ -114,9 +172,20 @@ pub fn generic_test(test_params: GenericTestParams) {
 
         let config = cfg.clone();
         let clients_stop_clone = clients_stop.clone();
+        let ops_clone = ops.clone();
         let spawn_client_results = std::thread::spawn(move || {
             spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
-                appending_client(index, clerk, clients_stop_clone.clone())
+                if !test_linearizability {
+                    appending_client(index, clerk, clients_stop_clone.clone())
+                } else {
+                    linearizability_client(
+                        index,
+                        clients,
+                        clerk,
+                        clients_stop_clone.clone(),
+                        ops_clone.clone(),
+                    )
+                }
             })
         });
 
@@ -157,10 +226,12 @@ pub fn generic_test(test_params: GenericTestParams) {
         for (index, client_result) in client_results.into_iter().enumerate() {
             let (op_count, last_result) =
                 client_result.join().expect("Client should never fail");
-            let real_result = clerk
-                .get(index.to_string())
-                .expect(&format!("Key {} should exist.", index));
-            assert_eq!(real_result, last_result);
+            if !last_result.is_empty() {
+                let real_result = clerk
+                    .get(index.to_string())
+                    .expect(&format!("Key {} should exist.", index));
+                assert_eq!(real_result, last_result);
+            }
             eprintln!("Client {} committed {} operations", index, op_count);
             assert!(
                 op_count >= min_ops,
@@ -173,4 +244,24 @@ pub fn generic_test(test_params: GenericTestParams) {
     }
 
     cfg.end();
+
+    if test_linearizability {
+        let ops: &'static Vec<Operation<KvInput, KvOutput>> =
+            Box::leak(Box::new(
+                Arc::try_unwrap(ops)
+                    .expect("No one should be holding ops")
+                    .into_inner(),
+            ));
+        let start = Instant::now();
+        eprintln!("Searching for linearization arrangements ...");
+        assert!(
+            linearizability::check_operations_timeout::<KvModel>(&ops, None),
+            "History {:?} is not linearizable,",
+            ops,
+        );
+        eprintln!(
+            "Searching for linearization arrangements done after {:?}.",
+            start.elapsed()
+        );
+    }
 }

+ 13 - 0
kvraft/tests/service_test.rs

@@ -247,3 +247,16 @@ fn persist_partition_unreliable() {
         ..Default::default()
     });
 }
+
+#[test]
+fn linearizability() {
+    generic_test(GenericTestParams {
+        clients: 15,
+        unreliable: true,
+        partition: true,
+        crash: true,
+        maxraftstate: None,
+        min_ops: Some(0),
+        test_linearizability: true,
+    });
+}

+ 12 - 6
linearizability/src/lib.rs

@@ -1,10 +1,12 @@
 use std::collections::HashSet;
+use std::fmt::Debug;
 use std::time::{Duration, Instant};
 
 use bit_set::BitSet;
 
 pub use model::KvInput;
 pub use model::KvModel;
+pub use model::KvOp;
 pub use model::KvOutput;
 pub use model::Model;
 
@@ -13,26 +15,27 @@ use crate::offset_linked_list::{NodeRef, OffsetLinkedList};
 mod model;
 mod offset_linked_list;
 
-pub struct Operation<C, R> {
+#[derive(Debug)]
+pub struct Operation<C: Debug, R: Debug> {
     pub call_op: C,
     pub call_time: Instant,
     pub return_op: R,
     pub return_time: Instant,
 }
 
-enum EntryKind<'a, C, R> {
+enum EntryKind<'a, C: Debug, R: Debug> {
     Call(&'a Operation<C, R>),
     Return,
 }
 
-struct Entry<'a, C, R> {
+struct Entry<'a, C: Debug, R: Debug> {
     kind: EntryKind<'a, C, R>,
     id: usize,
     time: Instant,
     other: usize,
 }
 
-fn operation_to_entries<'a, C, R>(
+fn operation_to_entries<'a, C: Debug, R: Debug>(
     ops: &[&'a Operation<C, R>],
 ) -> Vec<Entry<'a, C, R>> {
     let mut result = vec![];
@@ -121,7 +124,9 @@ fn check_history<T: Model>(
                 curr = prev;
                 flag = prev_flag;
 
+                list.unlift(NodeRef(list.get(leg).other));
                 list.unlift(leg);
+                leg = list.succ(leg).expect("There should be another element");
             }
         }
     }
@@ -154,10 +159,11 @@ where
 
 #[cfg(test)]
 mod tests {
-    use crate::{check_operations_timeout, Model, Operation};
     use std::time::{Duration, Instant};
 
-    #[derive(Clone, Eq, PartialEq, Hash)]
+    use crate::{check_operations_timeout, Model, Operation};
+
+    #[derive(Clone, Debug, Eq, PartialEq, Hash)]
     struct CountingModel {
         base: usize,
         cnt: usize,

+ 8 - 6
linearizability/src/model.rs

@@ -2,9 +2,11 @@ use std::collections::HashMap;
 
 use crate::Operation;
 
-pub trait Model: std::cmp::Eq + std::clone::Clone + std::hash::Hash {
-    type Input;
-    type Output;
+pub trait Model:
+    std::cmp::Eq + std::clone::Clone + std::hash::Hash + std::fmt::Debug
+{
+    type Input: std::fmt::Debug;
+    type Output: std::fmt::Debug;
 
     fn create() -> Self;
     fn partition(
@@ -17,14 +19,14 @@ pub trait Model: std::cmp::Eq + std::clone::Clone + std::hash::Hash {
     fn step(&mut self, input: &Self::Input, output: &Self::Output) -> bool;
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub enum KvOp {
     Get,
     Put,
     Append,
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct KvInput {
     pub op: KvOp,
     pub key: String,
@@ -34,7 +36,7 @@ pub type KvOutput = String;
 
 unsafe impl Sync for KvInput {}
 
-#[derive(Clone, Eq, PartialEq, Hash)]
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
 pub struct KvModel {
     expected_output: String,
 }

+ 3 - 0
linearizability/src/offset_linked_list.rs

@@ -97,6 +97,7 @@ impl<T> OffsetLinkedList<T> {
         unsafe { &*self.nodes[index].data.as_ptr() }
     }
 
+    #[allow(dead_code)]
     pub fn prev(&self, index: NodeRef) -> Option<NodeRef> {
         let index = self.offset_index(index);
         let succ = self.nodes[index].prev;
@@ -112,6 +113,7 @@ impl<T> OffsetLinkedList<T> {
         NodeRef::from_index(self.nodes[Self::HEAD].succ)
     }
 
+    #[allow(dead_code)]
     pub fn last(&self) -> Option<NodeRef> {
         NodeRef::from_index(self.nodes[Self::HEAD].prev)
     }
@@ -124,6 +126,7 @@ impl<T> OffsetLinkedList<T> {
         &self.nodes[index]
     }
 
+    #[allow(dead_code)]
     pub fn iter(&self) -> Iter<'_, T> {
         Iter {
             list: self,

+ 12 - 0
tests/snapshot_tests.rs

@@ -146,3 +146,15 @@ fn snapshot_unreliable_recover_partition() {
         ..Default::default()
     })
 }
+#[test]
+fn linearizability() {
+    generic_test(GenericTestParams {
+        clients: 15,
+        unreliable: true,
+        partition: true,
+        crash: true,
+        maxraftstate: Some(1000),
+        min_ops: Some(0),
+        test_linearizability: true,
+    });
+}