|
|
@@ -6,11 +6,13 @@ use parking_lot::{Condvar, Mutex};
|
|
|
use ruaft::{Persister, Raft, RpcClient};
|
|
|
use std::collections::hash_map::Entry;
|
|
|
use std::collections::HashMap;
|
|
|
+use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
|
|
use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
|
|
|
|
pub struct KVServer {
|
|
|
+ me: AtomicUsize,
|
|
|
state: Mutex<KVServerState>,
|
|
|
rf: Mutex<Raft<UniqueKVOp>>,
|
|
|
// snapshot
|
|
|
@@ -21,12 +23,12 @@ type IndexedCommand = (usize, UniqueKVOp);
|
|
|
#[derive(Clone, Default, Serialize, Deserialize)]
|
|
|
pub struct UniqueKVOp {
|
|
|
op: KVOp,
|
|
|
+ me: usize,
|
|
|
unique_id: UniqueId,
|
|
|
}
|
|
|
|
|
|
#[derive(Default)]
|
|
|
struct KVServerState {
|
|
|
- me: usize,
|
|
|
kv: HashMap<String, String>,
|
|
|
debug_kv: HashMap<String, String>,
|
|
|
applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
|
|
|
@@ -92,10 +94,8 @@ impl KVServer {
|
|
|
.expect("The receiving end of apply command channel should have not been dropped");
|
|
|
};
|
|
|
let ret = Arc::new(Self {
|
|
|
- state: Mutex::new(KVServerState {
|
|
|
- me,
|
|
|
- ..Default::default()
|
|
|
- }),
|
|
|
+ me: AtomicUsize::new(me),
|
|
|
+ state: Default::default(),
|
|
|
rf: Mutex::new(Raft::new(
|
|
|
servers,
|
|
|
me,
|
|
|
@@ -109,7 +109,7 @@ impl KVServer {
|
|
|
ret
|
|
|
}
|
|
|
|
|
|
- fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
|
|
|
+ fn apply_op(&self, unique_id: UniqueId, leader: usize, op: KVOp) {
|
|
|
// The borrow checker does not allow borrowing two fields of an instance
|
|
|
// inside a MutexGuard. But it does allow borrowing two fields of the
|
|
|
// instance itself. Calling deref_mut() on the MutexGuard works, too!
|
|
|
@@ -149,7 +149,14 @@ impl KVServer {
|
|
|
}
|
|
|
|
|
|
if let Some(result_holder) = state.queries.remove(&unique_id) {
|
|
|
- *result_holder.result.lock() = Ok(result);
|
|
|
+ // If this KV server might not be the same leader that committed
|
|
|
+ // this change. We are not sure if it is a duplicate or a conflict.
|
|
|
+ // To tell the difference, the terms and operations must be stored.
|
|
|
+ *result_holder.result.lock() = if leader == self.me() {
|
|
|
+ Ok(result)
|
|
|
+ } else {
|
|
|
+ Err(CommitError::Conflict)
|
|
|
+ };
|
|
|
result_holder.condvar.notify_all();
|
|
|
};
|
|
|
}
|
|
|
@@ -160,7 +167,7 @@ impl KVServer {
|
|
|
) {
|
|
|
std::thread::spawn(move || {
|
|
|
while let Ok((_, command)) = command_channel.recv() {
|
|
|
- self.apply_op(command.unique_id, command.op);
|
|
|
+ self.apply_op(command.unique_id, command.me, command.op);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
@@ -190,7 +197,11 @@ impl KVServer {
|
|
|
entry.clone()
|
|
|
};
|
|
|
|
|
|
- let op = UniqueKVOp { op, unique_id };
|
|
|
+ let op = UniqueKVOp {
|
|
|
+ op,
|
|
|
+ me: self.me(),
|
|
|
+ unique_id,
|
|
|
+ };
|
|
|
if self.rf.lock().start(op).is_none() {
|
|
|
return Err(CommitError::NotLeader);
|
|
|
}
|
|
|
@@ -273,6 +284,10 @@ impl KVServer {
|
|
|
PutAppendReply { result }
|
|
|
}
|
|
|
|
|
|
+ pub fn me(&self) -> usize {
|
|
|
+ self.me.load(Ordering::Relaxed)
|
|
|
+ }
|
|
|
+
|
|
|
pub fn raft(&self) -> Raft<UniqueKVOp> {
|
|
|
self.rf.lock().clone()
|
|
|
}
|