Selaa lähdekoodia

Complete rewrite the block-for-commit logic in server.

The previous version fails to commit a message if

1. A client sent a request to the current leader.
2. The current leader lost leadership with committing the request.
3. The client retried the logic with the new leader and committed it.
4. The previous leader will not be able to replay the new commit.

Also added two tests.
Jing Yang 4 vuotta sitten
vanhempi
commit
7dce6f66cb
4 muutettua tiedostoa jossa 69 lisäystä ja 149 poistoa
  1. 7 1
      kvraft/src/client.rs
  2. 1 0
      kvraft/src/common.rs
  3. 51 148
      kvraft/src/server.rs
  4. 10 0
      kvraft/tests/service_test.rs

+ 7 - 1
kvraft/src/client.rs

@@ -2,7 +2,7 @@ use super::common::{
     GetArgs, GetReply, KVRaftOptions, PutAppendArgs, PutAppendEnum,
     PutAppendReply, UniqueIdSequence, GET, PUT_APPEND,
 };
-use common::ValidReply;
+use common::{KVError, ValidReply};
 use labrpc::{Client, RequestMessage};
 use serde::de::DeserializeOwned;
 use serde::Serialize;
@@ -87,6 +87,12 @@ impl ClerkInner {
                             self.unique_id = UniqueIdSequence::new();
                         }
                     }
+                    Err(KVError::Expired) | Err(KVError::Conflict) => {
+                        // The client ID happens to be re-used. The request does
+                        // not fail as "Duplicate", because another client has
+                        // committed more than just the sentinel.
+                        self.unique_id = UniqueIdSequence::new();
+                    }
                     Err(_) => {}
                 };
             };

+ 1 - 0
kvraft/src/common.rs

@@ -7,6 +7,7 @@ pub type ClerkId = u64;
     Copy,
     Debug,
     Default,
+    Hash,
     Ord,
     PartialOrd,
     Eq,

+ 51 - 148
kvraft/src/server.rs

@@ -4,6 +4,7 @@ use super::common::{
 };
 use parking_lot::{Condvar, Mutex};
 use ruaft::{Persister, Raft, RpcClient};
+use std::collections::hash_map::Entry;
 use std::collections::HashMap;
 use std::sync::mpsc::{channel, Receiver};
 use std::sync::Arc;
@@ -28,7 +29,8 @@ struct KVServerState {
     me: usize,
     kv: HashMap<String, String>,
     debug_kv: HashMap<String, String>,
-    applied_op: HashMap<ClerkId, UniqueKVOpStep>,
+    applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
+    queries: HashMap<UniqueId, Arc<ResultHolder>>,
 }
 
 #[derive(Clone, Serialize, Deserialize)]
@@ -45,17 +47,6 @@ impl Default for KVOp {
     }
 }
 
-struct UniqueKVOpStep {
-    step: KVOpStep,
-    unique_id: UniqueId,
-}
-
-enum KVOpStep {
-    Unseen,
-    Pending(Arc<ResultHolder>),
-    Done(CommitResult),
-}
-
 struct ResultHolder {
     result: Mutex<Result<CommitResult, CommitError>>,
     condvar: Condvar,
@@ -118,42 +109,20 @@ impl KVServer {
         ret
     }
 
-    fn find_op_or_unseen(
-        applied_op: &mut HashMap<ClerkId, UniqueKVOpStep>,
-        unique_id: UniqueId,
-    ) -> &mut UniqueKVOpStep {
-        let ret = applied_op.entry(unique_id.clerk_id).or_insert_with(|| {
-            UniqueKVOpStep {
-                step: KVOpStep::Unseen,
-                unique_id,
-            }
-        });
-
-        // We know that the two unique_ids must come from the same clerk,
-        // because they are found in the same entry of applied_op.
-        assert_eq!(unique_id.clerk_id, ret.unique_id.clerk_id);
-
-        ret
-    }
-
     fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
         // The borrow checker does not allow borrowing two fields of an instance
         // inside a MutexGuard. But it does allow borrowing two fields of the
         // instance itself. Calling deref_mut() on the MutexGuard works, too!
         let state = &mut *self.state.lock();
         let (applied_op, kv) = (&mut state.applied_op, &mut state.kv);
-        let curr_op = Self::find_op_or_unseen(applied_op, unique_id);
-        if unique_id < curr_op.unique_id {
-            // Redelivered.
-            return;
-        }
-        if unique_id == curr_op.unique_id {
-            if let KVOpStep::Done(_) = curr_op.step {
+        let entry = applied_op.entry(unique_id.clerk_id);
+        if let Entry::Occupied(curr) = &entry {
+            let (applied_unique_id, _) = curr.get();
+            if *applied_unique_id >= unique_id {
                 // Redelivered.
                 return;
             }
         }
-        assert!(unique_id >= curr_op.unique_id);
 
         let result = match op {
             KVOp::NoOp => return,
@@ -169,22 +138,20 @@ impl KVServer {
                 CommitResult::Append
             }
         };
-        let last_op = std::mem::replace(
-            curr_op,
-            UniqueKVOpStep {
-                step: KVOpStep::Done(result.clone()),
-                unique_id,
-            },
-        );
-        assert!(unique_id >= last_op.unique_id);
-        if let KVOpStep::Pending(result_holder) = last_op.step {
-            *result_holder.result.lock() = if unique_id == last_op.unique_id {
-                Ok(result)
-            } else {
-                Err(CommitError::Expired(last_op.unique_id))
-            };
-            result_holder.condvar.notify_all();
+
+        match entry {
+            Entry::Occupied(mut curr) => {
+                curr.insert((unique_id, result.clone()));
+            }
+            Entry::Vacant(vacant) => {
+                vacant.insert((unique_id, result.clone()));
+            }
         }
+
+        if let Some(result_holder) = state.queries.remove(&unique_id) {
+            *result_holder.result.lock() = Ok(result);
+            result_holder.condvar.notify_all();
+        };
     }
 
     fn process_command(
@@ -204,106 +171,42 @@ impl KVServer {
         op: KVOp,
         timeout: Duration,
     ) -> Result<CommitResult, CommitError> {
-        let (unseen, result_holder) = {
+        let result_holder = {
             let mut state = self.state.lock();
-            let curr_op =
-                Self::find_op_or_unseen(&mut state.applied_op, unique_id);
-
-            // This is a newer request
-            if unique_id > curr_op.unique_id {
-                let last_op = std::mem::replace(
-                    curr_op,
-                    UniqueKVOpStep {
-                        step: KVOpStep::Unseen,
-                        unique_id,
-                    },
-                );
-                match last_op.step {
-                    KVOpStep::Unseen => {
-                        panic!("Unseen results should never be seen.")
-                    }
-                    // Notify all threads that are still waiting that a new
-                    // request has arrived. This should never happen.
-                    KVOpStep::Pending(result_holder) => {
-                        *result_holder.result.lock() =
-                            Err(CommitError::Expired(last_op.unique_id));
-                        result_holder.condvar.notify_all();
-                    }
-                    KVOpStep::Done(_) => {}
-                }
-            }
-
-            // Now we know unique_id <= curr_op.unique_id.
-            assert!(unique_id <= curr_op.unique_id);
-            match &curr_op.step {
-                KVOpStep::Unseen => {
-                    let result_holder = Arc::new(ResultHolder {
-                        // The default error is timed-out, if no one touches the
-                        // result holder at all.
-                        result: Mutex::new(Err(CommitError::TimedOut)),
-                        condvar: Condvar::new(),
-                    });
-                    curr_op.step = KVOpStep::Pending(result_holder.clone());
-                    (true, result_holder)
-                }
-                // The operation is still pending.
-                KVOpStep::Pending(result_holder) => {
-                    (false, result_holder.clone())
-                }
-                // The operation is a Get
-                KVOpStep::Done(CommitResult::Get(value)) => {
-                    return if unique_id == curr_op.unique_id {
-                        // This is the same operation as the last one
-                        Ok(CommitResult::Get(value.clone()))
-                    } else {
-                        // A past Get operation is being retried. We do not
-                        // know the proper value to return.
-                        Err(CommitError::Expired(unique_id))
-                    };
+            let applied = state.applied_op.get(&unique_id.clerk_id);
+            if let Some((applied_unique_id, result)) = applied {
+                if unique_id < *applied_unique_id {
+                    return Err(CommitError::Expired(unique_id));
+                } else if unique_id == *applied_unique_id {
+                    return Err(CommitError::Duplicate(result.clone()));
                 }
-                // For Put & Append operations, all we know is that all past
-                // operations must have been committed, returning OK.
-                KVOpStep::Done(result) => return Ok(result.clone()),
-            }
+            };
+            let entry = state.queries.entry(unique_id).or_insert_with(|| {
+                Arc::new(ResultHolder {
+                    result: Mutex::new(Err(CommitError::TimedOut)),
+                    condvar: Condvar::new(),
+                })
+            });
+            entry.clone()
         };
-        if unseen {
-            let op = UniqueKVOp { op, unique_id };
-            if self.rf.lock().start(op).is_none() {
-                eprintln!(
-                    "KV server {} is not the leader for {:?}",
-                    self.state.lock().me,
-                    unique_id
-                );
-                let mut state = self.state.lock();
-                let step = state
-                    .applied_op
-                    .get_mut(&unique_id.clerk_id)
-                    .expect("The entry must have been here");
-                if step.unique_id == unique_id {
-                    if let KVOpStep::Pending(_) = &step.step {
-                        // Nobody has touched the entry yet. Let's reset it.
-                        step.step = KVOpStep::Unseen;
-                        *result_holder.result.lock() =
-                            Err(CommitError::NotLeader);
-                        result_holder.condvar.notify_all();
-                    }
-                }
-                return Err(CommitError::NotLeader);
-            }
+
+        let op = UniqueKVOp { op, unique_id };
+        if self.rf.lock().start(op).is_none() {
+            return Err(CommitError::NotLeader);
         }
-        let mut result = result_holder.result.lock();
+
+        let mut guard = result_holder.result.lock();
         // Wait for the op to be committed.
-        result_holder.condvar.wait_for(&mut result, timeout);
-        let result = result.clone();
-        return if let Ok(result) = result {
-            if unseen {
-                Ok(result)
-            } else {
-                Err(CommitError::Duplicate(result))
-            }
-        } else {
-            result
-        };
+        result_holder.condvar.wait_for(&mut guard, timeout);
+
+        // Copy the result out.
+        let result = guard.clone();
+        // If the result is OK, all other requests should see "Duplicate".
+        if let Ok(result) = guard.clone() {
+            *guard = Err(CommitError::Duplicate(result))
+        }
+
+        return result;
     }
 
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);

+ 10 - 0
kvraft/tests/service_test.rs

@@ -122,3 +122,13 @@ fn generic_test(clients: usize, unreliable: bool, maxraftstate: usize) {
 fn basic_service() {
     generic_test(1, false, 0);
 }
+
+#[test]
+fn concurrent_client() {
+    generic_test(5, false, 0);
+}
+
+#[test]
+fn unreliable() {
+    generic_test(5, true, 0);
+}