|
|
@@ -9,7 +9,10 @@ use futures::FutureExt;
|
|
|
use parking_lot::Mutex;
|
|
|
use serde_derive::{Deserialize, Serialize};
|
|
|
|
|
|
-use ruaft::{ApplyCommandMessage, Persister, Raft, RemoteRaft, Term};
|
|
|
+use ruaft::{
|
|
|
+ ApplyCommandMessage, Index, Persister, Raft, RemoteRaft, Term,
|
|
|
+ VerifyAuthorityResult,
|
|
|
+};
|
|
|
use test_utils::log_with;
|
|
|
use test_utils::thread_local_logger::LocalLogger;
|
|
|
|
|
|
@@ -36,6 +39,7 @@ pub struct UniqueKVOp {
|
|
|
|
|
|
#[derive(Default, Serialize, Deserialize)]
|
|
|
struct KVServerState {
|
|
|
+ raft_index: Index,
|
|
|
kv: HashMap<String, String>,
|
|
|
debug_kv: HashMap<String, String>,
|
|
|
applied_op: HashMap<ClerkId, (UniqueId, CommitResult)>,
|
|
|
@@ -50,6 +54,11 @@ struct KVServerState {
|
|
|
>,
|
|
|
),
|
|
|
>,
|
|
|
+ #[serde(skip)]
|
|
|
+ index_subscribers: HashMap<
|
|
|
+ Index,
|
|
|
+ Vec<(String, futures::channel::oneshot::Sender<Option<String>>)>,
|
|
|
+ >,
|
|
|
}
|
|
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
|
@@ -198,6 +207,20 @@ impl KVServer {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ fn process_read_requests(&self, index: Index) {
|
|
|
+ let mut state = self.state.lock();
|
|
|
+ assert!(index > state.raft_index);
|
|
|
+ for index in state.raft_index..=index {
|
|
|
+ if let Some(read_requests) = state.index_subscribers.remove(&index)
|
|
|
+ {
|
|
|
+ for (key, sender) in read_requests {
|
|
|
+ let _ = sender.send(state.kv.get(&key).cloned());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ state.raft_index = index;
|
|
|
+ }
|
|
|
+
|
|
|
fn restore_state(&self, mut new_state: KVServerState) {
|
|
|
let mut state = self.state.lock();
|
|
|
// Cleanup all existing queries.
|
|
|
@@ -232,6 +255,7 @@ impl KVServer {
|
|
|
command.me,
|
|
|
command.op,
|
|
|
);
|
|
|
+ this.process_read_requests(index);
|
|
|
if let Some(snapshot) = snapshot_holder
|
|
|
.take_snapshot(&this.state.lock(), index)
|
|
|
{
|
|
|
@@ -247,6 +271,49 @@ impl KVServer {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ async fn block_for_read(
|
|
|
+ &self,
|
|
|
+ key: String,
|
|
|
+ ) -> Result<Option<String>, KVError> {
|
|
|
+ let result_fut = match self.rf.verify_authority_async() {
|
|
|
+ Some(result_fut) => result_fut,
|
|
|
+ None => return Err(KVError::NotLeader),
|
|
|
+ };
|
|
|
+ let index =
|
|
|
+ match tokio::time::timeout(Self::DEFAULT_TIMEOUT, result_fut).await
|
|
|
+ {
|
|
|
+ Ok(VerifyAuthorityResult::Success(index)) => index,
|
|
|
+ Ok(VerifyAuthorityResult::TermElapsed) => {
|
|
|
+ return Err(KVError::NotLeader)
|
|
|
+ }
|
|
|
+ Ok(VerifyAuthorityResult::TimedOut) => {
|
|
|
+ return Err(KVError::TimedOut)
|
|
|
+ }
|
|
|
+ Err(_e) => return Err(KVError::TimedOut),
|
|
|
+ };
|
|
|
+ let receiver = {
|
|
|
+ let state = self.state.lock();
|
|
|
+ if state.raft_index >= index {
|
|
|
+ return Ok(state.kv.get(&key).cloned());
|
|
|
+ }
|
|
|
+ let (sender, receiver) = futures::channel::oneshot::channel();
|
|
|
+ // The mutex guard is moved into this scope and dropped here.
|
|
|
+ let mut state = state;
|
|
|
+ match state.index_subscribers.entry(index) {
|
|
|
+ Entry::Occupied(mut occupied) => {
|
|
|
+ occupied.get_mut().push((key, sender))
|
|
|
+ }
|
|
|
+ Entry::Vacant(vacant) => {
|
|
|
+ let queue = vec![(key, sender)];
|
|
|
+ vacant.insert(queue);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ receiver
|
|
|
+ };
|
|
|
+
|
|
|
+ receiver.await.map_err(|_e| KVError::TimedOut)
|
|
|
+ }
|
|
|
+
|
|
|
const UNSEEN_TERM: usize = 0;
|
|
|
const ATTEMPTING_TERM: usize = usize::MAX;
|
|
|
async fn block_for_commit(
|
|
|
@@ -370,31 +437,30 @@ impl KVServer {
|
|
|
|
|
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
|
|
|
|
|
|
- pub async fn get(&self, args: GetArgs) -> GetReply {
|
|
|
- let map_dup = match args.op {
|
|
|
- GetEnum::AllowDuplicate => |r| Ok(r),
|
|
|
- GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
|
|
|
- };
|
|
|
+ pub async fn commit_sentinel(&self, args: GetArgs) -> GetReply {
|
|
|
+ assert_eq!(args.op, GetEnum::NoDuplicate);
|
|
|
let result_fut = self.block_for_commit(
|
|
|
args.unique_id,
|
|
|
KVOp::Get(args.key),
|
|
|
Self::DEFAULT_TIMEOUT,
|
|
|
);
|
|
|
let result = match result_fut.await {
|
|
|
- Ok(result) => Ok(result),
|
|
|
- Err(CommitError::Duplicate(result)) => map_dup(result),
|
|
|
- Err(CommitError::NotMe(result)) => map_dup(result),
|
|
|
+ Ok(CommitResult::Get(result)) => Ok(result),
|
|
|
+ Ok(CommitResult::Put) => Err(KVError::Conflict),
|
|
|
+ Ok(CommitResult::Append) => Err(KVError::Conflict),
|
|
|
+ Err(CommitError::Duplicate(_)) => Err(KVError::Conflict),
|
|
|
+ Err(CommitError::NotMe(_)) => Err(KVError::Conflict),
|
|
|
Err(e) => Err(e.into()),
|
|
|
};
|
|
|
- let result = match result {
|
|
|
- Ok(result) => result,
|
|
|
- Err(e) => return GetReply { result: Err(e) },
|
|
|
- };
|
|
|
- let result = match result {
|
|
|
- CommitResult::Get(result) => Ok(result),
|
|
|
- CommitResult::Put => Err(KVError::Conflict),
|
|
|
- CommitResult::Append => Err(KVError::Conflict),
|
|
|
- };
|
|
|
+ GetReply { result }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn get(&self, args: GetArgs) -> GetReply {
|
|
|
+ if args.op == GetEnum::NoDuplicate {
|
|
|
+ return self.commit_sentinel(args).await;
|
|
|
+ }
|
|
|
+ assert_eq!(args.op, GetEnum::AllowDuplicate);
|
|
|
+ let result = self.block_for_read(args.key).await;
|
|
|
GetReply { result }
|
|
|
}
|
|
|
|