|
@@ -270,6 +270,10 @@ impl KVServer {
|
|
|
&self,
|
|
&self,
|
|
|
key: String,
|
|
key: String,
|
|
|
) -> Result<Option<String>, KVError> {
|
|
) -> Result<Option<String>, KVError> {
|
|
|
|
|
+ if !self.keep_running.load(Ordering::SeqCst) {
|
|
|
|
|
+ return Err(KVError::NotLeader);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
let result_fut = match self.rf.verify_authority_async() {
|
|
let result_fut = match self.rf.verify_authority_async() {
|
|
|
Some(result_fut) => result_fut,
|
|
Some(result_fut) => result_fut,
|
|
|
None => return Err(KVError::NotLeader),
|
|
None => return Err(KVError::NotLeader),
|
|
@@ -499,6 +503,8 @@ impl KVServer {
|
|
|
for (_, (_, sender)) in self.state.lock().queries.drain() {
|
|
for (_, (_, sender)) in self.state.lock().queries.drain() {
|
|
|
let _ = sender.send(Err(CommitError::NotLeader));
|
|
let _ = sender.send(Err(CommitError::NotLeader));
|
|
|
}
|
|
}
|
|
|
|
|
+ // Drop all read requests.
|
|
|
|
|
+ self.state.lock().index_subscribers.drain();
|
|
|
|
|
|
|
|
let rf = self.raft().clone();
|
|
let rf = self.raft().clone();
|
|
|
// We must drop self to remove the only clone of raft, so that
|
|
// We must drop self to remove the only clone of raft, so that
|