Преглед изворни кода

Each unique result now has its own result holder.

So that next request does not wipe out the results of the previous result.
Jing Yang пре 5 година
родитељ
комит
151a13fcb2
1 измењених фајлова са 58 додато и 33 уклоњено
  1. 58 33
      kvraft/src/server.rs

+ 58 - 33
kvraft/src/server.rs

@@ -62,10 +62,15 @@ struct UniqueKVOpStep {
 
 enum KVOpStep {
     Unseen,
-    Pending(Arc<Condvar>),
+    Pending(Arc<ResultHolder>),
     Done(CommitResult),
 }
 
+struct ResultHolder {
+    result: Mutex<Result<CommitResult, CommitError>>,
+    condvar: Condvar,
+}
+
 #[derive(Clone, Debug)]
 enum CommitResult {
     Get(Option<String>),
@@ -73,7 +78,7 @@ enum CommitResult {
     Append,
 }
 
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 enum CommitError {
     NotLeader,
     Expired(UniqueId),
@@ -102,7 +107,7 @@ impl KVServerState {
         self.applied_op
             .entry(unique_id.clerk_id)
             .and_modify(|e| {
-                if e.step == KVOpStep::Unseen {
+                if let KVOpStep::Unseen = e.step {
                     panic!("Unseen op should never been here.")
                 }
             })
@@ -141,6 +146,20 @@ impl KVServer {
 
     fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
         let mut state = self.state.lock();
+        {
+            let curr_op = state.find_op_or_unseen(unique_id);
+            if unique_id < curr_op.unique_id {
+                // Redelivered.
+                return;
+            }
+            if unique_id == curr_op.unique_id {
+                if let KVOpStep::Done(_) = curr_op.step {
+                    // Redelivered.
+                    return;
+                }
+            }
+        }
+
         let result = match op {
             KVOp::NoOp => return,
             KVOp::Get(op) => CommitResult::Get(state.kv.get(&op.key).cloned()),
@@ -158,13 +177,21 @@ impl KVServer {
                 CommitResult::Append
             }
         };
-        let last_result = state.find_op_or_unseen(unique_id);
-        if unique_id > last_result.unique_id {
-            last_result.unique_id = unique_id
-        }
-        let last_step = std::mem::replace(&mut last_result.step, KVOpStep::Done(result);
-        if let KVOpStep::Pending(condvar) = last_step {
-            condvar.notify_all();
+        let last_op = std::mem::replace(
+            state.applied_op.get_mut(&unique_id.clerk_id).expect(""),
+            UniqueKVOpStep {
+                step: KVOpStep::Done(result.clone()),
+                unique_id,
+            },
+        );
+        assert!(unique_id >= last_op.unique_id);
+        if let KVOpStep::Pending(result_holder) = last_op.step {
+            *result_holder.result.lock() = if unique_id == last_op.unique_id {
+                Ok(result)
+            } else {
+                Err(CommitError::Expired(last_op.unique_id))
+            };
+            result_holder.condvar.notify_all();
         }
     }
 
@@ -185,7 +212,7 @@ impl KVServer {
         op: KVOp,
         timeout: Duration,
     ) -> Result<CommitResult, CommitError> {
-        let (unseen, condvar) = {
+        let (unseen, result_holder) = {
             let mut state = self.state.lock();
             let last_result = state.find_op_or_unseen(unique_id);
 
@@ -202,8 +229,8 @@ impl KVServer {
                     }
                     // Notify all threads that are still waiting that a new
                     // request has arrived. This should never happen.
-                    KVOpStep::Pending(condvar) => {
-                        condvar.notify_all();
+                    KVOpStep::Pending(result_holder) => {
+                        result_holder.condvar.notify_all();
                     }
                     KVOpStep::Done(_) => {}
                 }
@@ -214,12 +241,19 @@ impl KVServer {
             assert!(unique_id <= last_result.unique_id);
             match &last_result.step {
                 KVOpStep::Unseen => {
-                    let condvar = Arc::new(Condvar::new());
-                    last_result.step = KVOpStep::Pending(condvar.clone());
-                    (true, condvar)
+                    let result_holder = Arc::new(ResultHolder {
+                        // The default error is timed-out, if no one touches the
+                        // result holder at all.
+                        result: Mutex::new(Err(CommitError::TimedOut)),
+                        condvar: Condvar::new(),
+                    });
+                    last_result.step = KVOpStep::Pending(result_holder.clone());
+                    (true, result_holder)
                 }
                 // The operation is still pending.
-                KVOpStep::Pending(condvar) => (false, condvar.clone()),
+                KVOpStep::Pending(result_holder) => {
+                    (false, result_holder.clone())
+                }
                 // The operation is a Get
                 KVOpStep::Done(CommitResult::Get(value)) => {
                     return if unique_id == last_result.unique_id {
@@ -242,27 +276,18 @@ impl KVServer {
                 return Err(CommitError::NotLeader);
             }
         }
-        let mut state = self.state.lock();
+        let mut result = result_holder.result.lock();
         // Wait for the op to be committed.
-        condvar.wait_for(&mut state, timeout);
-        let step = state
-            .applied_op
-            .get(&unique_id.clerk_id)
-            .ok_or_else(CommitError::Expired(unique_id))?;
-
-        if unique_id != step.unique_id {
-            // The clerk must have seen the result of this request because they
-            // are sending in a new one. Just return error.
-            return Err(CommitError::Expired(unique_id));
-        }
-        return if let KVOpStep::Done(result) = &step.step {
+        result_holder.condvar.wait_for(&mut result, timeout);
+        let result = result.clone();
+        return if let Ok(result) = result {
             if unseen {
-                Ok(result.clone())
+                Ok(result)
             } else {
-                Err(CommitError::Duplicate(result.clone()))
+                Err(CommitError::Duplicate(result))
             }
         } else {
-            Err(CommitError::TimedOut)
+            result
         };
     }