Explorar el Código

Implement process command thread.

Jing Yang hace 5 años
padre
commit
ed4bf67bfa
Se han modificado 1 ficheros con 58 adiciones y 15 borrados
  1. 58 15
      kvraft/src/server.rs

+ 58 - 15
kvraft/src/server.rs

@@ -5,19 +5,18 @@ use super::common::{
 use parking_lot::{Condvar, Mutex};
 use ruaft::{Persister, Raft, RpcClient};
 use std::collections::HashMap;
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::mpsc::{channel, Receiver};
 use std::sync::Arc;
 use std::time::Duration;
 
 struct KVServer {
     state: Mutex<KVServerState>,
-    rf: Raft<UniqueKVOp>,
-    command_channel: Receiver<(usize, UniqueKVOp)>,
-    shutdown: AtomicBool,
+    rf: Mutex<Raft<UniqueKVOp>>,
     // snapshot
 }
 
+type IndexedCommand = (usize, UniqueKVOp);
+
 #[derive(Clone, Default, Serialize, Deserialize)]
 struct UniqueKVOp {
     op: KVOp,
@@ -100,27 +99,70 @@ impl KVServer {
         servers: Vec<RpcClient>,
         me: usize,
         persister: Arc<dyn Persister>,
-    ) -> Self {
+    ) -> Arc<Self> {
         let (tx, rx) = channel();
         let apply_command = move |index, command| {
             tx.send((index, command))
                 .expect("The receiving end of apply command channel should have not been dropped");
         };
-        Self {
+        let ret = Arc::new(Self {
             state: Default::default(),
-            rf: Raft::new(
+            rf: Mutex::new(Raft::new(
                 servers,
                 me,
                 persister,
                 apply_command,
                 None,
                 Raft::<UniqueKVOp>::NO_SNAPSHOT,
-            ),
-            command_channel: rx,
-            shutdown: AtomicBool::new(false),
+            )),
+        });
+        ret.clone().process_command(rx);
+        ret
+    }
+
+    fn apply_op(&self, unique_id: UniqueId, op: KVOp) {
+        let mut state = self.state.lock();
+        let result = match op {
+            KVOp::NoOp => return,
+            KVOp::Get(op) => CommitResult::Get(state.kv.get(&op.key).cloned()),
+            KVOp::Put(op) => {
+                state.kv.insert(op.key, op.value);
+                CommitResult::Put
+            }
+            KVOp::Append(op) => {
+                let (key, value) = (op.key, op.value);
+                state
+                    .kv
+                    .entry(key)
+                    .and_modify(|str| str.push_str(&value))
+                    .or_insert(value);
+                CommitResult::Append
+            }
+        };
+        if let Some(step) = state.applied_op.get_mut(&unique_id.clerk_id) {
+            match &step.step {
+                KVOpStep::Pending(condvar) => {
+                    condvar.notify_all();
+                }
+                _ => panic!(),
+            }
+            if unique_id == step.unique_id {
+                step.step = KVOpStep::Done(result);
+            }
         }
     }
 
+    fn process_command(
+        self: Arc<Self>,
+        command_channel: Receiver<IndexedCommand>,
+    ) {
+        std::thread::spawn(move || {
+            while let Ok((_, command)) = command_channel.recv() {
+                self.apply_op(command.unique_id, command.op);
+            }
+        });
+    }
+
     fn block_for_commit(
         &self,
         unique_id: UniqueId,
@@ -186,17 +228,17 @@ impl KVServer {
         };
         if unseen {
             let op = UniqueKVOp { op, unique_id };
-            if self.rf.start(op).is_none() {
+            if self.rf.lock().start(op).is_none() {
                 return Err(CommitError::NotLeader);
             }
         }
         let mut state = self.state.lock();
-        // Wait for the op to be comitted.
+        // Wait for the op to be committed.
         condvar.wait_for(&mut state, timeout);
         let step = state
             .applied_op
             .get(&unique_id.clerk_id)
-            .expect("Clerk entry should have been inserted.");
+            .ok_or_else(CommitError::Expired(unique_id))?;
 
         if unique_id != step.unique_id {
             // The clerk must have seen the result of this request because they
@@ -283,7 +325,8 @@ impl KVServer {
     }
 
     pub fn kill(self) {
-        self.shutdown.store(true, Ordering::Relaxed);
-        self.rf.kill()
+        self.rf.into_inner().kill()
+        // The process_command thread will exit, after Raft drops the reference
+        // to the sender.
     }
 }