Ver código fonte

Add a test to prove inconsistency.

Jing Yang 3 anos atrás
pai
commit
0f9d237f52
2 arquivos alterados com 133 adições e 6 exclusões
  1. 43 4
      test_configs/src/interceptor/mod.rs
  2. 90 2
      tests/regression_tests.rs

+ 43 - 4
test_configs/src/interceptor/mod.rs

@@ -193,8 +193,12 @@ impl Config {
         None
     }
 
-    pub async fn put(&self, key: String, value: String) -> Result<(), ()> {
-        let kv_server = self.find_leader().unwrap();
+    pub async fn put_to_kv(
+        &self,
+        kv_server: &KVServer,
+        key: String,
+        value: String,
+    ) -> Result<(), ()> {
         let result = kv_server
             .put_append(PutAppendArgs {
                 key,
@@ -210,6 +214,24 @@ impl Config {
         result.result.map_err(|_| ())
     }
 
+    pub async fn put(&self, key: String, value: String) -> Result<(), ()> {
+        let kv_server = self.find_leader().unwrap();
+        self.put_to_kv(kv_server, key, value).await
+    }
+
+    pub fn spawn_put_to_kv(
+        self: &Arc<Self>,
+        index: usize,
+        key: String,
+        value: String,
+    ) -> impl Future<Output = Result<(), ()>> {
+        let this = self.clone();
+        async move {
+            this.put_to_kv(this.kv_servers[index].as_ref(), key, value)
+                .await
+        }
+    }
+
     pub fn spawn_put(
         self: &Arc<Self>,
         key: String,
@@ -219,12 +241,29 @@ impl Config {
         async move { this.put(key, value).await }
     }
 
-    pub async fn get(&self, key: String) -> Result<String, ()> {
-        let kv_server = self.find_leader().unwrap();
+    pub async fn get_from_kv(
+        &self,
+        kv_server: &KVServer,
+        key: String,
+    ) -> Result<String, ()> {
         let result = kv_server.get(GetArgs { key }).await;
         result.result.map(|v| v.unwrap_or_default()).map_err(|_| ())
     }
 
+    pub fn spawn_get_from_kv(
+        self: &Arc<Self>,
+        index: usize,
+        key: String,
+    ) -> impl Future<Output = Result<String, ()>> {
+        let this = self.clone();
+        async move { this.get_from_kv(this.kv_servers[index].as_ref(), key).await }
+    }
+
+    pub async fn get(&self, key: String) -> Result<String, ()> {
+        let kv_server = self.find_leader().unwrap();
+        self.get_from_kv(kv_server, key).await
+    }
+
     pub fn spawn_get(
         self: &Arc<Self>,
         key: String,

+ 90 - 2
tests/regression_tests.rs

@@ -1,8 +1,9 @@
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
 use ruaft::utils::integration_test::{
     unpack_append_entries_args, unpack_append_entries_reply,
 };
-use std::sync::Arc;
-use std::time::{Duration, Instant};
 use test_configs::interceptor::{make_config, RaftRpcEvent};
 use test_utils::init_test_log;
 
@@ -53,3 +54,90 @@ fn smoke_test() {
     let value = thread_pool.block_on(get).unwrap().unwrap();
     assert_eq!("consistency", value);
 }
+
+#[test]
+fn delayed_commit_consistency_test() {
+    init_test_log!();
+    let server_count = 3;
+    let config = Arc::new(make_config(server_count, None));
+    let thread_pool = tokio::runtime::Runtime::new().unwrap();
+
+    let first_write = thread_pool.spawn(
+        config.spawn_put("consistency".to_string(), "failed".to_string()),
+    );
+    let mut write_handle = None;
+    while let Ok((event, handle)) = config.event_queue.receiver.recv() {
+        if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
+            if let Some(index_term) = unpack_append_entries_args(args) {
+                let (term, success) = unpack_append_entries_reply(reply);
+                if term == index_term.term && success && index_term.index >= 1 {
+                    if write_handle.is_none() {
+                        write_handle.replace(handle);
+                        break;
+                    } else {
+                        handle.reply_interrupted_error();
+                    }
+                }
+            }
+        }
+    }
+    let write_handle = write_handle.unwrap();
+    let leader = write_handle.from;
+    assert!(
+        config.kv_servers[leader].raft().get_state().1,
+        "leader should still be leader"
+    );
+
+    // Block everything from/to leader until we see a new leader.
+    let mut new_leader = leader;
+    while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
+        let from = handle.from;
+        if from == leader || handle.to == leader {
+            handle.reply_interrupted_error();
+        } else {
+            handle.unblock();
+            if config.kv_servers[from].raft().get_state().1 {
+                new_leader = from;
+                break;
+            }
+        }
+    }
+    assert_ne!(new_leader, leader, "A new leader must have been elected");
+    assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
+
+    let second_write = thread_pool.spawn(config.spawn_put_to_kv(
+        new_leader,
+        "consistency".to_string(),
+        "guaranteed".to_string(),
+    ));
+    while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
+        if handle.from == leader || handle.to == leader {
+            handle.reply_interrupted_error();
+        } else {
+            handle.unblock();
+        }
+        if second_write.is_finished() {
+            break;
+        }
+    }
+    assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
+    assert!(second_write.is_finished());
+    thread_pool.block_on(second_write).unwrap().unwrap();
+
+    let read = thread_pool
+        .spawn(config.spawn_get_from_kv(leader, "consistency".to_string()));
+    // Spare kv server some time to handle the request.
+    std::thread::sleep(Duration::from_millis(100));
+
+    // Unblocks the write response.
+    write_handle.unblock();
+
+    thread_pool.block_on(first_write).unwrap().unwrap();
+
+    if let Ok(result) = thread_pool.block_on(read).unwrap() {
+        // This is so wrong. The second write was successful.
+        assert_eq!(result, "failed");
+    } else {
+        panic!("The read request should not timeout");
+    }
+}