|
|
@@ -1,6 +1,6 @@
|
|
|
use std::collections::hash_map::Entry;
|
|
|
use std::collections::HashMap;
|
|
|
-use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
|
|
use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
|
@@ -19,7 +19,7 @@ pub struct KVServer {
|
|
|
me: AtomicUsize,
|
|
|
state: Mutex<KVServerState>,
|
|
|
rf: Mutex<Raft<UniqueKVOp>>,
|
|
|
- // snapshot
|
|
|
+ keep_running: AtomicBool,
|
|
|
}
|
|
|
|
|
|
#[derive(Clone, Default, Serialize, Deserialize)]
|
|
|
@@ -114,6 +114,7 @@ impl KVServer {
|
|
|
max_state_size_bytes,
|
|
|
move |index| snapshot_holder_clone.request_snapshot(index),
|
|
|
)),
|
|
|
+ keep_running: AtomicBool::new(true),
|
|
|
});
|
|
|
ret.process_command(snapshot_holder, rx);
|
|
|
ret
|
|
|
@@ -235,6 +236,9 @@ impl KVServer {
|
|
|
op: KVOp,
|
|
|
timeout: Duration,
|
|
|
) -> Result<CommitResult, CommitError> {
|
|
|
+ if !self.keep_running.load(Ordering::SeqCst) {
|
|
|
+ return Err(CommitError::NotLeader);
|
|
|
+ }
|
|
|
let result_holder = {
|
|
|
let mut state = self.state.lock();
|
|
|
let applied = state.applied_op.get(&unique_id.clerk_id);
|
|
|
@@ -413,6 +417,14 @@ impl KVServer {
|
|
|
}
|
|
|
|
|
|
pub fn kill(self: Arc<Self>) {
|
|
|
+ // Return error to new queries.
|
|
|
+ self.keep_running.store(false, Ordering::SeqCst);
|
|
|
+ // Cancel all in-flight queries.
|
|
|
+ for result_holder in self.state.lock().queries.values() {
|
|
|
+ *result_holder.result.lock() = Err(CommitError::NotLeader);
|
|
|
+ result_holder.condvar.notify_all();
|
|
|
+ }
|
|
|
+
|
|
|
let rf = self.raft();
|
|
|
// We must drop self to remove the only clone of raft, so that
|
|
|
// `rf.kill()` does not block.
|