Просмотр исходного кода

Remove code that clears reads after a commit: merge branch 'fix_commit_read'

Jing Yang 3 лет назад
Родитель
Сommit
9f457d8960

+ 1 - 1
kvraft/src/lib.rs

@@ -2,7 +2,7 @@ pub use async_client::{AsyncClerk, AsyncClient};
 pub use client::Clerk;
 pub use common::{
     CommitSentinelArgs, CommitSentinelReply, GetArgs, GetReply, PutAppendArgs,
-    PutAppendReply,
+    PutAppendEnum, PutAppendReply, UniqueId,
 };
 pub use remote_kvraft::RemoteKvraft;
 pub use server::KVServer;

+ 7 - 1
src/utils/integration_test.rs

@@ -1,7 +1,7 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, Peer, RequestVoteArgs,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
     RequestVoteReply, Term,
 };
 
@@ -40,6 +40,12 @@ pub fn unpack_request_vote_reply(reply: RequestVoteReply) -> (Term, bool) {
     (reply.term, reply.vote_granted)
 }
 
+pub fn unpack_append_entries_args<T>(
+    request: AppendEntriesArgs<T>,
+) -> Option<IndexTerm> {
+    request.entries.last().map(|e| e.into())
+}
+
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }

+ 6 - 114
src/verify_authority.rs

@@ -188,73 +188,10 @@ impl VerifyAuthorityDaemon {
             return;
         }
 
-        self.clear_committed_requests(current_term, commit_index);
         self.clear_ticked_requests(commit_index);
         self.remove_expired_requests(current_term);
     }
 
-    /// Clears all requests that have seen at least one commit.
-    /// This function handles the following scenario: a verify authority request
-    /// was received, when the `commit_index` was at C. Later as the leader we
-    /// moved the commit index to at least C+1. That implies that when the
-    /// request was first received, no other new commits after C could have been
-    /// added to the log, either by this replica or others. It then follows that
-    /// we can claim we had authority at that point.
-    fn clear_committed_requests(
-        &self,
-        current_term: Term,
-        commit_index: Index,
-    ) {
-        let mut state = self.state.lock();
-        // We might skip some requests that could have been cleared, if we did
-        // not react to the commit notification fast enough, and missed a
-        // commit. This is about the case where in the last iteration
-        // `commit_index` was `ci`, but in this iteration it becomes `ci + 2`
-        // (or even larger), skipping `ci + 1`.
-        //
-        // Obviously skipping a commit is a problem if `ci + 2` and `ci + 1` are
-        // both committed by us in this term. The requests that are cleared by
-        // `+1` will be cleared by `+2` anyway. Similarly it is not a problem if
-        // neither are committed by us in this term, since `+1` will not clear
-        // any requests.
-        //
-        // If `+2` is not committed by us, but `+1` is, we lose the opportunity
-        // to use `+1` to clear requests. The chances of losing this opportunity
-        // are slim, because between `+1` and `+2`, there has to be a missed
-        // heartbeat interval, and a new commit (`+2`) from another leader. We
-        // have plenty of time to run this method before `+2` reaches us.
-        //
-        // Overall it is acceptable to simplify the implementation and risk
-        // losing the mentioned opportunity.
-        if current_term != state.term {
-            return;
-        }
-
-        // Note the commit_index in the queue might not be in increasing order.
-        // We could still have requests that have a smaller commit_index after
-        // this sweep. That is an acceptable tradeoff we are taking.
-        while let Some(head) = state.queue.pop_front() {
-            if head.commit_index >= commit_index {
-                state.queue.push_front(head);
-                break;
-            }
-            // At the start of the term, the previous leader might have exposed
-            // all entries before the sentinel commit to clients. If a request
-            // arrived before the sentinel commit is committed, its commit index
-            // (token.commit_index) might be inaccurate. Thus we cannot allow
-            // the client to return any state before the sentinel index.
-            //
-            // We did not choose the sentinel index but opted for a more strict
-            // commit index, because the index is committed anyway. It should be
-            // delivered to the application really quickly. We paid the price
-            // with latency but made the request more fresh.
-            let _ = head
-                .sender
-                .send(VerifyAuthorityResult::Success(commit_index));
-            state.start.0 += 1;
-        }
-    }
-
     /// Fetches the newest successful RPC response from peers, and mark verify
     /// authority requests as complete if they are covered by more than half of
     /// the replicas.
@@ -608,54 +545,6 @@ mod tests {
         }
     }
 
-    #[test]
-    fn test_clear_committed_requests() {
-        let daemon = init_daemon();
-        let t0 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
-        let t1 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 1);
-        let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX);
-        let t3 = daemon.verify_authority_async(TERM, COMMIT_INDEX - 2);
-        let t4 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
-        // Run one iteration: no new commit, no new tick, for last term.
-        daemon.run_verify_authority_iteration(PAST_TERM, COMMIT_INDEX);
-        // Tokens should stay as-is.
-        assert_queue_len!(&daemon, 5);
-
-        // Run one iteration: no new commit, no new tick, for next term.
-        daemon.run_verify_authority_iteration(NEXT_TERM, COMMIT_INDEX);
-        // Tokens should stay as-is.
-        assert_queue_len!(&daemon, 5);
-
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
-        assert_queue_len!(&daemon, 3);
-        {
-            let queue = &daemon.state.lock().queue;
-            assert_eq!(queue[0].commit_index, COMMIT_INDEX);
-            // We can reply SUCCESS to this token, but it is not at the beginning of
-            // the queue, so we left it as-is.
-            assert_eq!(queue[1].commit_index, COMMIT_INDEX - 2);
-            assert_eq!(queue[2].commit_index, COMMIT_INDEX + 1);
-        }
-
-        assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
-        assert_ticket_ready!(t1, VerifyAuthorityResult::Success(COMMIT_INDEX));
-        let t2 = assert_ticket_pending!(t2);
-        let t3 = assert_ticket_pending!(t3);
-        let t4 = assert_ticket_pending!(t4);
-
-        // Clears the queue even if the sentinel is not committed.
-        // Note this case is impossible in practise. We do not commit anything
-        // until the sentinel is committed.
-        daemon.state.lock().sentinel_commit_index = COMMIT_INDEX + 3;
-        daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX + 2);
-
-        assert_queue_len!(&daemon, 0);
-        let at_index = COMMIT_INDEX + 2;
-        assert_ticket_ready!(t2, VerifyAuthorityResult::Success(at_index));
-        assert_ticket_ready!(t3, VerifyAuthorityResult::Success(at_index));
-        assert_ticket_ready!(t4, VerifyAuthorityResult::Success(at_index));
-    }
-
     #[test]
     fn test_clear_ticked_requests() {
         let daemon = init_daemon();
@@ -906,7 +795,10 @@ mod tests {
         daemon.run_verify_authority_iteration(TERM, COMMIT_INDEX);
         assert_queue_len!(&daemon, 1);
         // Cleared by the committed sentinel.
-        assert_ticket_ready!(t0, VerifyAuthorityResult::Success(COMMIT_INDEX));
+        assert_ticket_ready!(
+            t0,
+            VerifyAuthorityResult::Success(COMMIT_INDEX - 1)
+        );
 
         // New requests t2 and t3.
         let t2 = daemon.verify_authority_async(TERM, COMMIT_INDEX + 1);
@@ -962,7 +854,7 @@ mod tests {
         // Then we lost leadership. Someone became the leader and created new
         // entries. Those entries are committed, but we did not know.
         // So our commit index is not moved.
-        let _prev_term_log_index = COMMIT_INDEX + 2;
+        let prev_term_log_index = COMMIT_INDEX + 2;
         // However, the new leader had answer queries at _prev_term_log_index.
 
         // We created a new sentinel, it is not yet committed.
@@ -988,7 +880,7 @@ mod tests {
         daemon.run_verify_authority_iteration(NEXT_TERM, sentinel_commit_index);
         assert_ticket_ready!(
             t,
-            VerifyAuthorityResult::Success(sentinel_commit_index)
+            VerifyAuthorityResult::Success(prev_term_log_index)
         );
     }
 

+ 3 - 0
test_configs/Cargo.toml

@@ -8,11 +8,14 @@ anyhow = "1.0"
 async-trait = "0.1"
 bincode = "1.3.3"
 bytes = "1.1"
+crossbeam-channel = "0.5.5"
+futures-channel = "0.3.21"
 futures-util = "0.3.21"
 kvraft = { path = "../kvraft", features = ["integration-test"] }
 labrpc = "0.2.2"
 linearizability = { path = "../linearizability" }
 log = "0.4"
+once_cell = "1.12.0"
 parking_lot = "0.12"
 rand = "0.8"
 ruaft = { path = "..", features = ["integration-test"] }

+ 312 - 0
test_configs/src/interceptor/mod.rs

@@ -0,0 +1,312 @@
+use std::future::Future;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use crossbeam_channel::{Receiver, Sender};
+use once_cell::sync::OnceCell;
+
+use kvraft::{
+    GetArgs, KVServer, PutAppendArgs, PutAppendEnum, UniqueId, UniqueKVOp,
+};
+use ruaft::{
+    AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
+    InstallSnapshotReply, Raft, RemoteRaft, ReplicableCommand, RequestVoteArgs,
+    RequestVoteReply,
+};
+
+use crate::Persister;
+
+type RaftId = usize;
+
+pub struct EventHandle {
+    pub from: RaftId,
+    pub to: RaftId,
+    sender: futures_channel::oneshot::Sender<std::io::Result<()>>,
+}
+
+struct EventStub {
+    receiver: futures_channel::oneshot::Receiver<std::io::Result<()>>,
+}
+
+fn create_event_pair(from: RaftId, to: RaftId) -> (EventHandle, EventStub) {
+    let (sender, receiver) = futures_channel::oneshot::channel();
+    (EventHandle { from, to, sender }, EventStub { receiver })
+}
+
+impl EventHandle {
+    pub fn unblock(self) {
+        self.sender.send(Ok(())).unwrap();
+    }
+
+    pub fn reply_error(self, e: std::io::Error) {
+        self.sender.send(Err(e)).unwrap();
+    }
+
+    pub fn reply_interrupted_error(self) {
+        self.reply_error(std::io::Error::from(std::io::ErrorKind::Interrupted))
+    }
+}
+
+impl EventStub {
+    pub async fn wait(self) -> std::io::Result<()> {
+        self.receiver.await.unwrap_or(Ok(()))
+    }
+}
+
+pub enum RaftRpcEvent<T> {
+    RequestVoteRequest(RequestVoteArgs),
+    RequestVoteResponse(RequestVoteArgs, RequestVoteReply),
+    AppendEntriesRequest(AppendEntriesArgs<T>),
+    AppendEntriesResponse(AppendEntriesArgs<T>, AppendEntriesReply),
+    InstallSnapshotRequest(InstallSnapshotArgs),
+    InstallSnapshotResponse(InstallSnapshotArgs, InstallSnapshotReply),
+}
+
+struct InterceptingRpcClient<T> {
+    from: RaftId,
+    to: RaftId,
+    target: OnceCell<Raft<T>>,
+    event_queue: Sender<(RaftRpcEvent<T>, EventHandle)>,
+}
+
+impl<T> InterceptingRpcClient<T> {
+    async fn intercept(&self, event: RaftRpcEvent<T>) -> std::io::Result<()> {
+        let (handle, stub) = create_event_pair(self.from, self.to);
+        let _ = self.event_queue.send((event, handle));
+        stub.wait().await
+    }
+
+    pub fn set_raft(&self, target: Raft<T>) {
+        self.target
+            .set(target)
+            .map_err(|_| ())
+            .expect("Raft should only be set once");
+    }
+}
+
+#[async_trait]
+impl<T: ReplicableCommand> RemoteRaft<T> for &InterceptingRpcClient<T> {
+    async fn request_vote(
+        &self,
+        args: RequestVoteArgs,
+    ) -> std::io::Result<RequestVoteReply> {
+        let event_result = self
+            .intercept(RaftRpcEvent::RequestVoteRequest(args.clone()))
+            .await;
+        if let Err(e) = event_result {
+            return Err(e);
+        };
+
+        let reply = self.target.wait().process_request_vote(args.clone());
+
+        self.intercept(RaftRpcEvent::RequestVoteResponse(args, reply.clone()))
+            .await
+            .map(|_| reply)
+    }
+
+    async fn append_entries(
+        &self,
+        args: AppendEntriesArgs<T>,
+    ) -> std::io::Result<AppendEntriesReply> {
+        let args_clone = args.clone();
+        let event_result = self
+            .intercept(RaftRpcEvent::AppendEntriesRequest(args_clone))
+            .await;
+        if let Err(e) = event_result {
+            return Err(e);
+        };
+
+        let reply = self.target.wait().process_append_entries(args.clone());
+
+        self.intercept(RaftRpcEvent::AppendEntriesResponse(args, reply.clone()))
+            .await
+            .map(|_| reply)
+    }
+
+    async fn install_snapshot(
+        &self,
+        args: InstallSnapshotArgs,
+    ) -> std::io::Result<InstallSnapshotReply> {
+        let event_result = self
+            .intercept(RaftRpcEvent::InstallSnapshotRequest(args.clone()))
+            .await;
+        if let Err(e) = event_result {
+            return Err(e);
+        };
+
+        let reply = self.target.wait().process_install_snapshot(args.clone());
+
+        self.intercept(RaftRpcEvent::InstallSnapshotResponse(
+            args,
+            reply.clone(),
+        ))
+        .await
+        .map(|_| reply)
+    }
+}
+
+pub struct EventQueue<T> {
+    pub receiver: Receiver<(RaftRpcEvent<T>, EventHandle)>,
+}
+
+fn make_grid_clients<T>(
+    server_count: usize,
+) -> (EventQueue<T>, Vec<Vec<InterceptingRpcClient<T>>>) {
+    let (sender, receiver) = crossbeam_channel::unbounded();
+    let mut all_clients = vec![];
+    for from in 0..server_count {
+        let mut clients = vec![];
+        for to in 0..server_count {
+            let interceptor = InterceptingRpcClient {
+                from,
+                to,
+                target: Default::default(),
+                event_queue: sender.clone(),
+            };
+            clients.push(interceptor);
+        }
+        all_clients.push(clients);
+    }
+    (EventQueue { receiver }, all_clients)
+}
+
+pub struct Config {
+    pub event_queue: EventQueue<UniqueKVOp>,
+    pub kv_servers: Vec<Arc<KVServer>>,
+    seq: AtomicUsize,
+}
+
+impl Config {
+    pub fn find_leader(&self) -> Option<&KVServer> {
+        let start = Instant::now();
+        while start.elapsed() < Duration::from_secs(1) {
+            if let Some(kv_server) = self
+                .kv_servers
+                .iter()
+                .find(|kv_server| kv_server.raft().get_state().1)
+            {
+                return Some(kv_server.as_ref());
+            }
+        }
+        None
+    }
+
+    pub async fn put_to_kv(
+        &self,
+        kv_server: &KVServer,
+        key: String,
+        value: String,
+    ) -> Result<(), ()> {
+        let result = kv_server
+            .put_append(PutAppendArgs {
+                key,
+                value,
+                op: PutAppendEnum::Put,
+                unique_id: UniqueId {
+                    clerk_id: 1,
+                    sequence_id: self.seq.fetch_add(1, Ordering::Relaxed)
+                        as u64,
+                },
+            })
+            .await;
+        result.result.map_err(|_| ())
+    }
+
+    pub async fn put(&self, key: String, value: String) -> Result<(), ()> {
+        let kv_server = self.find_leader().unwrap();
+        self.put_to_kv(kv_server, key, value).await
+    }
+
+    pub fn spawn_put_to_kv(
+        self: &Arc<Self>,
+        index: usize,
+        key: String,
+        value: String,
+    ) -> impl Future<Output = Result<(), ()>> {
+        let this = self.clone();
+        async move {
+            this.put_to_kv(this.kv_servers[index].as_ref(), key, value)
+                .await
+        }
+    }
+
+    pub fn spawn_put(
+        self: &Arc<Self>,
+        key: String,
+        value: String,
+    ) -> impl Future<Output = Result<(), ()>> {
+        let this = self.clone();
+        async move { this.put(key, value).await }
+    }
+
+    pub async fn get_from_kv(
+        &self,
+        kv_server: &KVServer,
+        key: String,
+    ) -> Result<String, ()> {
+        let result = kv_server.get(GetArgs { key }).await;
+        result.result.map(|v| v.unwrap_or_default()).map_err(|_| ())
+    }
+
+    pub fn spawn_get_from_kv(
+        self: &Arc<Self>,
+        index: usize,
+        key: String,
+    ) -> impl Future<Output = Result<String, ()>> {
+        let this = self.clone();
+        async move { this.get_from_kv(this.kv_servers[index].as_ref(), key).await }
+    }
+
+    pub async fn get(&self, key: String) -> Result<String, ()> {
+        let kv_server = self.find_leader().unwrap();
+        self.get_from_kv(kv_server, key).await
+    }
+
+    pub fn spawn_get(
+        self: &Arc<Self>,
+        key: String,
+    ) -> impl Future<Output = Result<String, ()>> {
+        let this = self.clone();
+        async move { this.get(key).await }
+    }
+}
+
+pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
+    let (event_queue, clients) = make_grid_clients(server_count);
+    let persister = Arc::new(Persister::new());
+    let mut kv_servers = vec![];
+    let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
+        .into_iter()
+        .map(|v| {
+            v.into_iter()
+                .map(|c| {
+                    let c = Box::leak(Box::new(c));
+                    &*c
+                })
+                .collect()
+        })
+        .collect();
+    for (index, client_vec) in clients.iter().enumerate() {
+        let kv_server = KVServer::new(
+            client_vec.to_vec(),
+            index,
+            persister.clone(),
+            max_state,
+        );
+        kv_servers.push(kv_server);
+    }
+
+    for clients in clients.iter() {
+        for j in 0..server_count {
+            clients[j].set_raft(kv_servers[j].raft().clone());
+        }
+    }
+
+    Config {
+        event_queue,
+        kv_servers,
+        seq: AtomicUsize::new(0),
+    }
+}

+ 1 - 0
test_configs/src/lib.rs

@@ -1,3 +1,4 @@
+pub mod interceptor;
 pub mod kvraft;
 mod persister;
 pub mod raft;

+ 136 - 0
tests/regression_tests.rs

@@ -0,0 +1,136 @@
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use ruaft::utils::integration_test::{
+    unpack_append_entries_args, unpack_append_entries_reply,
+};
+use test_configs::interceptor::{make_config, RaftRpcEvent};
+use test_utils::init_test_log;
+
+#[tokio::test(flavor = "multi_thread")]
+async fn smoke_test() {
+    init_test_log!();
+    let server_count = 3;
+    let config = make_config(server_count, None);
+    let config = Arc::new(config);
+    let put = tokio::spawn(
+        config.spawn_put("commit".to_string(), "consistency".to_string()),
+    );
+
+    let mut responded = false;
+    while let Ok((event, handle)) = config.event_queue.receiver.recv() {
+        if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
+            if let Some(index_term) = unpack_append_entries_args(args) {
+                let (term, success) = unpack_append_entries_reply(reply);
+                if term == index_term.term && success && index_term.index >= 1 {
+                    responded = true;
+                    break;
+                }
+            }
+        }
+        handle.unblock();
+    }
+    assert!(responded, "At least one peer must have responded OK");
+    let result = put.await.unwrap();
+    assert!(result.is_ok());
+
+    let get = tokio::spawn(config.spawn_get("commit".to_string()));
+    let start = Instant::now();
+    while let Ok((_event, handle)) = config
+        .event_queue
+        .receiver
+        .recv_timeout(Duration::from_secs(1))
+    {
+        if get.is_finished() {
+            break;
+        }
+        if start.elapsed() >= Duration::from_secs(1) {
+            break;
+        }
+        handle.unblock();
+    }
+    assert!(get.is_finished());
+    let value = get.await.unwrap().unwrap();
+    assert_eq!("consistency", value);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn delayed_commit_consistency_test() {
+    init_test_log!();
+    let server_count = 3;
+    let config = Arc::new(make_config(server_count, None));
+
+    let first_write = tokio::spawn(
+        config.spawn_put("consistency".to_string(), "failed".to_string()),
+    );
+    let mut write_handle = None;
+    while let Ok((event, handle)) = config.event_queue.receiver.recv() {
+        if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
+            if let Some(index_term) = unpack_append_entries_args(args) {
+                let (term, success) = unpack_append_entries_reply(reply);
+                if term == index_term.term && success && index_term.index >= 1 {
+                    if write_handle.is_none() {
+                        write_handle.replace(handle);
+                        break;
+                    } else {
+                        handle.reply_interrupted_error();
+                    }
+                }
+            }
+        }
+    }
+    let write_handle = write_handle.unwrap();
+    let leader = write_handle.from;
+    assert!(
+        config.kv_servers[leader].raft().get_state().1,
+        "leader should still be leader"
+    );
+
+    // Block everything from/to leader until we see a new leader.
+    let mut new_leader = leader;
+    while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
+        let from = handle.from;
+        if from == leader || handle.to == leader {
+            handle.reply_interrupted_error();
+        } else {
+            handle.unblock();
+            if config.kv_servers[from].raft().get_state().1 {
+                new_leader = from;
+                break;
+            }
+        }
+    }
+    assert_ne!(new_leader, leader, "A new leader must have been elected");
+    assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
+
+    let second_write = tokio::spawn(config.spawn_put_to_kv(
+        new_leader,
+        "consistency".to_string(),
+        "guaranteed".to_string(),
+    ));
+    while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
+        if handle.from == leader || handle.to == leader {
+            handle.reply_interrupted_error();
+        } else {
+            handle.unblock();
+        }
+        if second_write.is_finished() {
+            break;
+        }
+    }
+    assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
+    assert!(second_write.is_finished());
+    second_write.await.unwrap().unwrap();
+
+    let read = tokio::spawn(
+        config.spawn_get_from_kv(leader, "consistency".to_string()),
+    );
+    // Spare kv server some time to handle the request.
+    std::thread::sleep(Duration::from_millis(100));
+
+    // Unblocks the write response.
+    write_handle.unblock();
+    first_write.await.unwrap().unwrap();
+
+    assert!(read.await.unwrap().is_err());
+}