Sfoglia il codice sorgente

Implment linearizability module.

Jing Yang 4 anni fa
parent
commit
a5806e1874

+ 2 - 1
Cargo.toml

@@ -30,5 +30,6 @@ kvraft = { path = "kvraft" }
 
 [workspace]
 members = [
-    "kvraft"
+    "kvraft",
+    "linearizability",
 ]

+ 8 - 0
linearizability/Cargo.toml

@@ -0,0 +1,8 @@
+[package]
+name = "linearizability"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+anyhow = "1.0"
+bit-set = "0.5"

+ 213 - 0
linearizability/src/lib.rs

@@ -0,0 +1,213 @@
+use std::collections::HashSet;
+use std::time::Duration;
+
+use bit_set::BitSet;
+
+pub use model::KvInput;
+pub use model::KvModel;
+pub use model::KvOutput;
+pub use model::Model;
+
+use crate::offset_linked_list::{NodeRef, OffsetLinkedList};
+
+mod model;
+mod offset_linked_list;
+
+pub struct Operation<C, R> {
+    call_op: C,
+    call_time: u64,
+    return_op: R,
+    return_time: u64,
+}
+
+enum EntryKind<'a, C, R> {
+    Call(&'a Operation<C, R>),
+    Return,
+}
+
+struct Entry<'a, C, R> {
+    kind: EntryKind<'a, C, R>,
+    id: usize,
+    time: u64,
+    other: usize,
+}
+
+fn operation_to_entries<'a, C, R>(
+    ops: &[&'a Operation<C, R>],
+) -> Vec<Entry<'a, C, R>> {
+    let mut result = vec![];
+    for op in ops {
+        let id = result.len() >> 1;
+        result.push(Entry {
+            kind: EntryKind::Return,
+            id,
+            time: op.return_time,
+            other: 0,
+        });
+        result.push(Entry {
+            kind: EntryKind::Call(op),
+            id,
+            time: op.call_time,
+            other: 0,
+        });
+    }
+    result.sort_by_cached_key(|e| e.time);
+    let mut this = vec![0; ops.len()];
+    let mut that = vec![0; ops.len()];
+    for (index, entry) in result.iter().enumerate() {
+        match entry.kind {
+            EntryKind::Call(_) => this[entry.id] = index,
+            EntryKind::Return => that[entry.id] = index,
+        }
+    }
+    for i in 0..ops.len() {
+        result[this[i]].other = that[i];
+        result[that[i]].other = this[i];
+    }
+    result
+}
+
+fn check_history<T: Model>(
+    ops: &[&Operation<<T as Model>::Input, <T as Model>::Output>],
+) -> bool {
+    let entries = operation_to_entries(ops);
+    let mut list = OffsetLinkedList::create(entries);
+
+    let mut all = HashSet::new();
+    let mut stack = vec![];
+
+    let mut flag = BitSet::new();
+    let mut leg = list.first().expect("Linked list should not be empty");
+    let mut curr = T::create();
+    while !list.is_empty() {
+        let entry = list.get(leg);
+        let other = NodeRef(entry.other);
+        match entry.kind {
+            EntryKind::Call(ops) => {
+                let mut next = curr.clone();
+                if next.step(&ops.call_op, &ops.return_op) {
+                    let mut next_flag = flag.clone();
+                    next_flag.insert(entry.id);
+                    if all.insert((next_flag.clone(), next.clone())) {
+                        std::mem::swap(&mut curr, &mut next);
+                        std::mem::swap(&mut flag, &mut next_flag);
+                        stack.push((leg, next, next_flag));
+
+                        list.lift(leg);
+                        list.lift(other);
+
+                        if let Some(first) = list.first() {
+                            leg = first;
+                        } else {
+                            break;
+                        }
+                    } else {
+                        leg = list
+                            .succ(leg)
+                            .expect("There should be another element");
+                    }
+                } else {
+                    leg = list
+                        .succ(leg)
+                        .expect("There should be another element");
+                }
+            }
+            EntryKind::Return => {
+                if stack.is_empty() {
+                    return false;
+                }
+                let (prev_leg, prev, prev_flag) = stack.pop().unwrap();
+                leg = prev_leg;
+                curr = prev;
+                flag = prev_flag;
+
+                list.unlift(leg);
+            }
+        }
+    }
+    true
+}
+
+pub fn check_operations_timeout<T: Model>(
+    history: &'static [Operation<<T as Model>::Input, <T as Model>::Output>],
+    _: Option<Duration>,
+) -> bool
+where
+    <T as Model>::Input: Sync,
+    <T as Model>::Output: Sync,
+{
+    let mut results = vec![];
+    for sub_history in T::partition(history) {
+        results
+            .push(std::thread::spawn(move || check_history::<T>(&sub_history)));
+    }
+    let mut failed = vec![];
+    for (index, result) in results.into_iter().enumerate() {
+        let result = result.join().expect("Search thread should never panic");
+        if !result {
+            eprintln!("Partition {} failed.", index);
+            failed.push(index);
+        }
+    }
+    failed.is_empty()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{check_operations_timeout, Model, Operation};
+
+    #[derive(Clone, Eq, PartialEq, Hash)]
+    struct CountingModel {
+        base: usize,
+        cnt: usize,
+    }
+
+    impl Model for CountingModel {
+        type Input = usize;
+        type Output = usize;
+
+        fn create() -> Self {
+            Self { base: 0, cnt: 0 }
+        }
+
+        fn step(&mut self, input: &Self::Input, output: &Self::Output) -> bool {
+            if self.base == 0 && *input != 0 && *output == 1 {
+                self.base = *input;
+                self.cnt = 1;
+                true
+            } else if self.base == *input && self.cnt + 1 == *output {
+                self.cnt += 1;
+                true
+            } else {
+                false
+            }
+        }
+    }
+    #[test]
+    fn no_accept() {
+        let ops = Box::leak(Box::new(vec![]));
+        for i in 0..4 {
+            ops.push(Operation {
+                call_op: 0usize,
+                call_time: 0,
+                return_op: i as usize,
+                return_time: i,
+            });
+        }
+        assert!(!check_operations_timeout::<CountingModel>(ops, None));
+    }
+
+    #[test]
+    fn accept() {
+        let mut ops = Box::leak(Box::new(vec![]));
+        for i in 0..4 {
+            ops.push(Operation {
+                call_op: 1usize,
+                call_time: i * 2,
+                return_op: (i + 1) as usize,
+                return_time: i + 4,
+            });
+        }
+        assert!(check_operations_timeout::<CountingModel>(ops, None));
+    }
+}

+ 80 - 0
linearizability/src/model.rs

@@ -0,0 +1,80 @@
+use std::collections::HashMap;
+
+use crate::Operation;
+
+pub trait Model: std::cmp::Eq + std::clone::Clone + std::hash::Hash {
+    type Input;
+    type Output;
+
+    fn create() -> Self;
+    fn partition(
+        history: &[Operation<Self::Input, Self::Output>],
+    ) -> Vec<Vec<&Operation<Self::Input, Self::Output>>> {
+        let history: Vec<&Operation<Self::Input, Self::Output>> =
+            history.iter().map(|e| e).collect();
+        return vec![history];
+    }
+    fn step(&mut self, input: &Self::Input, output: &Self::Output) -> bool;
+}
+
+#[derive(Clone)]
+pub enum KvOp {
+    Get,
+    Put,
+    Append,
+}
+
+#[derive(Clone)]
+pub struct KvInput {
+    op: KvOp,
+    key: String,
+    value: String,
+}
+pub type KvOutput = String;
+
+unsafe impl Sync for KvInput {}
+
+#[derive(Clone, Eq, PartialEq, Hash)]
+pub struct KvModel {
+    expected_output: String,
+}
+
+impl Model for KvModel {
+    type Input = KvInput;
+    type Output = KvOutput;
+
+    fn create() -> Self {
+        KvModel {
+            expected_output: String::new(),
+        }
+    }
+
+    fn partition(
+        history: &[Operation<KvInput, KvOutput>],
+    ) -> Vec<Vec<&Operation<KvInput, KvOutput>>> {
+        let mut by_key =
+            HashMap::<String, Vec<&Operation<KvInput, KvOutput>>>::new();
+        for op in history.into_iter() {
+            by_key.entry(op.call_op.key.clone()).or_default().push(op);
+        }
+        let mut result = vec![];
+        for (_, values) in by_key {
+            result.push(values);
+        }
+        result
+    }
+
+    fn step(&mut self, input: &KvInput, output: &KvOutput) -> bool {
+        match input.op {
+            KvOp::Get => self.expected_output == *output,
+            KvOp::Put => {
+                self.expected_output = input.value.clone();
+                true
+            }
+            KvOp::Append => {
+                self.expected_output += &input.value;
+                true
+            }
+        }
+    }
+}

+ 195 - 0
linearizability/src/offset_linked_list.rs

@@ -0,0 +1,195 @@
+use std::mem::MaybeUninit;
+
+struct Node<T> {
+    prev: usize,
+    succ: usize,
+    data: MaybeUninit<T>,
+}
+
+impl<T> Default for Node<T> {
+    fn default() -> Self {
+        Self {
+            prev: 0,
+            succ: 0,
+            data: MaybeUninit::uninit(),
+        }
+    }
+}
+
+pub struct OffsetLinkedList<T> {
+    nodes: Vec<Node<T>>,
+}
+
+#[derive(Copy, Clone, Eq, PartialEq)]
+pub struct NodeRef(pub usize);
+
+impl NodeRef {
+    fn from_index(index: usize) -> Option<Self> {
+        if index == OffsetLinkedList::<()>::HEAD {
+            None
+        } else {
+            Some(Self(index - 1))
+        }
+    }
+}
+
+pub struct Iter<'a, T> {
+    list: &'a OffsetLinkedList<T>,
+    index: usize,
+}
+
+impl<'a, T> Iterator for Iter<'a, T> {
+    type Item = &'a T;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.index == OffsetLinkedList::<()>::HEAD {
+            None
+        } else {
+            let node = self.list.at(self.index);
+            self.index = node.succ;
+            Some(unsafe { &*node.data.as_ptr() })
+        }
+    }
+}
+
+impl<T> OffsetLinkedList<T> {
+    const HEAD: usize = 0;
+
+    pub fn create(data: Vec<T>) -> Self {
+        let len = data.len();
+        let mut nodes = Vec::with_capacity(len + 1);
+        for _ in 0..len + 1 {
+            nodes.push(Node::default());
+        }
+        for (i, data) in data.into_iter().enumerate() {
+            nodes[i].succ = i + 1;
+            nodes[i + 1].prev = i;
+            nodes[i + 1].data = MaybeUninit::new(data);
+        }
+        nodes[Self::HEAD].prev = len;
+        nodes[len].succ = Self::HEAD;
+        Self { nodes }
+    }
+
+    fn offset_index(&self, index: NodeRef) -> usize {
+        assert!(index.0 + 1 < self.nodes.len());
+        index.0 + 1
+    }
+
+    pub fn lift(&mut self, index: NodeRef) {
+        let index = self.offset_index(index);
+        let prev = self.nodes[index].prev;
+        let succ = self.nodes[index].succ;
+        self.nodes[prev].succ = succ;
+        self.nodes[succ].prev = prev;
+    }
+
+    pub fn unlift(&mut self, index: NodeRef) {
+        let index = self.offset_index(index);
+        let prev = self.nodes[index].prev;
+        let succ = self.nodes[index].succ;
+        self.nodes[prev].succ = index;
+        self.nodes[succ].prev = index;
+    }
+
+    pub fn get(&self, index: NodeRef) -> &T {
+        let index = self.offset_index(index);
+        unsafe { &*self.nodes[index].data.as_ptr() }
+    }
+
+    pub fn prev(&self, index: NodeRef) -> Option<NodeRef> {
+        let index = self.offset_index(index);
+        let succ = self.nodes[index].prev;
+        NodeRef::from_index(succ)
+    }
+
+    pub fn succ(&self, index: NodeRef) -> Option<NodeRef> {
+        let index = self.offset_index(index);
+        NodeRef::from_index(self.nodes[index].succ)
+    }
+
+    pub fn first(&self) -> Option<NodeRef> {
+        NodeRef::from_index(self.nodes[Self::HEAD].succ)
+    }
+
+    pub fn last(&self) -> Option<NodeRef> {
+        NodeRef::from_index(self.nodes[Self::HEAD].prev)
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.nodes[Self::HEAD].succ == Self::HEAD
+    }
+
+    fn at(&self, index: usize) -> &Node<T> {
+        &self.nodes[index]
+    }
+
+    pub fn iter(&self) -> Iter<'_, T> {
+        Iter {
+            list: self,
+            index: 1,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::offset_linked_list::{NodeRef, OffsetLinkedList};
+
+    fn make_list() -> OffsetLinkedList<char> {
+        let data: Vec<char> = ('a'..='z').collect();
+        OffsetLinkedList::create(data)
+    }
+
+    fn assert_char_list_eq(list: &OffsetLinkedList<char>, ans: &str) {
+        let mut list_str = String::new();
+        let mut leg = list.first();
+        while let Some(curr) = leg {
+            list_str.push(*list.get(curr));
+            leg = list.succ(curr);
+        }
+        assert_eq!(&list_str, ans);
+    }
+
+    #[test]
+    fn linked_list() {
+        let mut list = make_list();
+        let data_str: String = ('a'..='z').collect();
+        assert_char_list_eq(&list, &data_str);
+
+        let mut leg = list.first().unwrap();
+        for i in 0..10 {
+            if i % 3 == 0 {
+                list.lift(leg);
+            }
+            leg = list.succ(leg).unwrap();
+        }
+        list.lift(leg);
+        assert_char_list_eq(&list, &"bcefhilmnopqrstuvwxyz");
+
+        list.unlift(NodeRef(0));
+        list.unlift(NodeRef(3));
+        list.unlift(NodeRef(10));
+        assert_char_list_eq(&list, &"abcdefhiklmnopqrstuvwxyz");
+    }
+
+    #[test]
+    fn empty_linked_list() {
+        let mut list = make_list();
+        assert!(!list.is_empty());
+
+        let mut leg = list.first();
+        while let Some(curr) = leg {
+            leg = list.succ(curr);
+            list.lift(curr);
+        }
+        assert!(list.is_empty())
+    }
+
+    #[test]
+    fn iterate_linked_list() {
+        let list_str: String = make_list().iter().collect();
+        let data_str: String = ('a'..='z').collect();
+        assert_eq!(data_str, list_str);
+    }
+}