|
|
@@ -25,6 +25,7 @@ pub struct UniqueKVOp {
|
|
|
|
|
|
#[derive(Default)]
|
|
|
struct KVServerState {
|
|
|
+ me: usize,
|
|
|
kv: HashMap<String, String>,
|
|
|
debug_kv: HashMap<String, String>,
|
|
|
applied_op: HashMap<ClerkId, UniqueKVOpStep>,
|
|
|
@@ -100,7 +101,10 @@ impl KVServer {
|
|
|
.expect("The receiving end of apply command channel should have not been dropped");
|
|
|
};
|
|
|
let ret = Arc::new(Self {
|
|
|
- state: Default::default(),
|
|
|
+ state: Mutex::new(KVServerState {
|
|
|
+ me,
|
|
|
+ ..Default::default()
|
|
|
+ }),
|
|
|
rf: Mutex::new(Raft::new(
|
|
|
servers,
|
|
|
me,
|
|
|
@@ -118,17 +122,12 @@ impl KVServer {
|
|
|
applied_op: &mut HashMap<ClerkId, UniqueKVOpStep>,
|
|
|
unique_id: UniqueId,
|
|
|
) -> &mut UniqueKVOpStep {
|
|
|
- let ret = applied_op
|
|
|
- .entry(unique_id.clerk_id)
|
|
|
- .and_modify(|e| {
|
|
|
- if let KVOpStep::Unseen = e.step {
|
|
|
- panic!("Unseen op should never been here.")
|
|
|
- }
|
|
|
- })
|
|
|
- .or_insert_with(|| UniqueKVOpStep {
|
|
|
+ let ret = applied_op.entry(unique_id.clerk_id).or_insert_with(|| {
|
|
|
+ UniqueKVOpStep {
|
|
|
step: KVOpStep::Unseen,
|
|
|
unique_id,
|
|
|
- });
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
// We know that the two unique_ids must come from the same clerk,
|
|
|
// because they are found in the same entry of applied_op.
|
|
|
@@ -270,6 +269,25 @@ impl KVServer {
|
|
|
if unseen {
|
|
|
let op = UniqueKVOp { op, unique_id };
|
|
|
if self.rf.lock().start(op).is_none() {
|
|
|
+ eprintln!(
|
|
|
+ "KV server {} is not the leader for {:?}",
|
|
|
+ self.state.lock().me,
|
|
|
+ unique_id
|
|
|
+ );
|
|
|
+ let mut state = self.state.lock();
|
|
|
+ let step = state
|
|
|
+ .applied_op
|
|
|
+ .get_mut(&unique_id.clerk_id)
|
|
|
+ .expect("The entry must have been here");
|
|
|
+ if step.unique_id == unique_id {
|
|
|
+ if let KVOpStep::Pending(_) = &step.step {
|
|
|
+ // Nobody has touched the entry yet. Let's reset it.
|
|
|
+ step.step = KVOpStep::Unseen;
|
|
|
+ *result_holder.result.lock() =
|
|
|
+ Err(CommitError::NotLeader);
|
|
|
+ result_holder.condvar.notify_all();
|
|
|
+ }
|
|
|
+ }
|
|
|
return Err(CommitError::NotLeader);
|
|
|
}
|
|
|
}
|