|
|
@@ -118,7 +118,7 @@ impl KVServer {
|
|
|
applied_op: &mut HashMap<ClerkId, UniqueKVOpStep>,
|
|
|
unique_id: UniqueId,
|
|
|
) -> &mut UniqueKVOpStep {
|
|
|
- applied_op
|
|
|
+ let ret = applied_op
|
|
|
.entry(unique_id.clerk_id)
|
|
|
.and_modify(|e| {
|
|
|
if let KVOpStep::Unseen = e.step {
|
|
|
@@ -128,7 +128,13 @@ impl KVServer {
|
|
|
.or_insert_with(|| UniqueKVOpStep {
|
|
|
step: KVOpStep::Unseen,
|
|
|
unique_id,
|
|
|
- })
|
|
|
+ });
|
|
|
+
|
|
|
+ // We know that the two unique_ids must come from the same clerk,
|
|
|
+ // because they are found in the same entry of applied_op.
|
|
|
+ assert_eq!(unique_id.clerk_id, ret.unique_id.clerk_id);
|
|
|
+
|
|
|
+ ret
|
|
|
}
|
|
|
|
|
|
fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
|
|
|
@@ -201,33 +207,36 @@ impl KVServer {
|
|
|
) -> Result<CommitResult, CommitError> {
|
|
|
let (unseen, result_holder) = {
|
|
|
let mut state = self.state.lock();
|
|
|
- let last_result =
|
|
|
+ let curr_op =
|
|
|
Self::find_op_or_unseen(&mut state.applied_op, unique_id);
|
|
|
|
|
|
- // We know that the two unique_ids must come from the same clerk,
|
|
|
- // because they are found in the same entry of applied_op.
|
|
|
- assert_eq!(unique_id.clerk_id, last_result.unique_id.clerk_id);
|
|
|
-
|
|
|
// This is a newer request
|
|
|
- if unique_id > last_result.unique_id {
|
|
|
- last_result.unique_id = unique_id;
|
|
|
- match &last_result.step {
|
|
|
+ if unique_id > curr_op.unique_id {
|
|
|
+ let last_op = std::mem::replace(
|
|
|
+ curr_op,
|
|
|
+ UniqueKVOpStep {
|
|
|
+ step: KVOpStep::Unseen,
|
|
|
+ unique_id,
|
|
|
+ },
|
|
|
+ );
|
|
|
+ match last_op.step {
|
|
|
KVOpStep::Unseen => {
|
|
|
panic!("Unseen results should never be seen.")
|
|
|
}
|
|
|
// Notify all threads that are still waiting that a new
|
|
|
// request has arrived. This should never happen.
|
|
|
KVOpStep::Pending(result_holder) => {
|
|
|
+ *result_holder.result.lock() =
|
|
|
+ Err(CommitError::Expired(last_op.unique_id));
|
|
|
result_holder.condvar.notify_all();
|
|
|
}
|
|
|
KVOpStep::Done(_) => {}
|
|
|
}
|
|
|
- last_result.step = KVOpStep::Unseen;
|
|
|
}
|
|
|
|
|
|
- // Now we know unique_id <= last_result.unique_id.
|
|
|
- assert!(unique_id <= last_result.unique_id);
|
|
|
- match &last_result.step {
|
|
|
+ // Now we know unique_id <= curr_op.unique_id.
|
|
|
+ assert!(unique_id <= curr_op.unique_id);
|
|
|
+ match &curr_op.step {
|
|
|
KVOpStep::Unseen => {
|
|
|
let result_holder = Arc::new(ResultHolder {
|
|
|
// The default error is timed-out, if no one touches the
|
|
|
@@ -235,7 +244,7 @@ impl KVServer {
|
|
|
result: Mutex::new(Err(CommitError::TimedOut)),
|
|
|
condvar: Condvar::new(),
|
|
|
});
|
|
|
- last_result.step = KVOpStep::Pending(result_holder.clone());
|
|
|
+ curr_op.step = KVOpStep::Pending(result_holder.clone());
|
|
|
(true, result_holder)
|
|
|
}
|
|
|
// The operation is still pending.
|
|
|
@@ -244,7 +253,7 @@ impl KVServer {
|
|
|
}
|
|
|
// The operation is a Get
|
|
|
KVOpStep::Done(CommitResult::Get(value)) => {
|
|
|
- return if unique_id == last_result.unique_id {
|
|
|
+ return if unique_id == curr_op.unique_id {
|
|
|
// This is the same operation as the last one
|
|
|
Ok(CommitResult::Get(value.clone()))
|
|
|
} else {
|