Selaa lähdekoodia

Implement start() and kill()

Now we are feature complete.
Jing Yang 5 vuotta sitten
vanhempi
commit
8fb61b2fd0
1 muutettua tiedostoa jossa 54 lisäystä ja 16 poistoa
  1. 54 16
      src/lib.rs

+ 54 - 16
src/lib.rs

@@ -30,17 +30,19 @@ enum State {
 #[derive(
     Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize,
 )]
-struct Term(usize);
+pub struct Term(usize);
 #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
 struct Peer(usize);
 
+pub type Index = usize;
+
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct Command(usize);
 
 #[derive(Clone, Serialize, Deserialize)]
 struct LogEntry {
     term: Term,
-    index: usize,
+    index: Index,
     // TODO: Allow sending of arbitrary information.
     command: Command,
 }
@@ -50,11 +52,11 @@ struct RaftState {
     voted_for: Option<Peer>,
     log: Vec<LogEntry>,
 
-    commit_index: usize,
-    last_applied: usize,
+    commit_index: Index,
+    last_applied: Index,
 
-    next_index: Vec<usize>,
-    match_index: Vec<usize>,
+    next_index: Vec<Index>,
+    match_index: Vec<Index>,
     current_step: Vec<i64>,
 
     state: State,
@@ -88,7 +90,7 @@ pub struct Raft {
 struct RequestVoteArgs {
     term: Term,
     candidate_id: Peer,
-    last_log_index: usize,
+    last_log_index: Index,
     last_log_term: Term,
 }
 
@@ -102,10 +104,10 @@ struct RequestVoteReply {
 struct AppendEntriesArgs {
     term: Term,
     leader_id: Peer,
-    prev_log_index: usize,
+    prev_log_index: Index,
     prev_log_term: Term,
     entries: Vec<LogEntry>,
-    leader_commit: usize,
+    leader_commit: Index,
 }
 
 #[derive(Clone, Serialize, Deserialize)]
@@ -121,7 +123,7 @@ impl Raft {
         apply_command: Func,
     ) -> Self
     where
-        Func: 'static + Send + FnMut(usize, Command),
+        Func: 'static + Send + FnMut(Index, Command),
     {
         let peer_size = peers.len();
         let state = RaftState {
@@ -512,10 +514,7 @@ impl Raft {
                 // Shutdown signal.
                 let keep_running = self.keep_running.clone();
                 self.thread_pool.spawn(async move {
-                    loop {
-                        if !keep_running.load(Ordering::SeqCst) {
-                            break;
-                        }
+                    while keep_running.load(Ordering::SeqCst) {
                         interval.tick().await;
                         if let Some(args) = Self::build_heartbeat(&rf) {
                             tokio::spawn(Self::send_heartbeat(
@@ -694,7 +693,7 @@ impl Raft {
         mut apply_command: Func,
     ) -> std::thread::JoinHandle<()>
     where
-        Func: 'static + Send + FnMut(usize, Command),
+        Func: 'static + Send + FnMut(Index, Command),
     {
         let keep_running = self.keep_running.clone();
         let rf = self.inner_state.clone();
@@ -730,6 +729,45 @@ impl Raft {
             }
         })
     }
+
+    pub fn start(&self, command: Command) -> Option<(Term, Index)> {
+        let mut rf = self.inner_state.lock();
+        let term = rf.current_term;
+        if rf.state != State::Leader {
+            return None;
+        }
+
+        let index = rf.log.len();
+        rf.log.push(LogEntry {
+            term,
+            index,
+            command,
+        });
+        rf.persist();
+
+        self.new_log_entry
+            .clone()
+            .unwrap()
+            .send(None)
+            .expect("Sending to new log entry queue should never fail.");
+
+        return Some((term, index));
+    }
+
+    pub fn kill(mut self) {
+        self.keep_running.store(false, Ordering::SeqCst);
+        self.election.stop_election_timer();
+        self.new_log_entry.take().map(|n| n.send(None));
+        self.apply_command_signal.notify_all();
+        std::sync::Arc::try_unwrap(self.thread_pool)
+            .expect(
+                "All references to the thread pool should have been dropped.",
+            )
+            .shutdown_timeout(Duration::from_millis(
+                HEARTBEAT_INTERVAL_MILLIS * 2,
+            ));
+        self.inner_state.lock().persist();
+    }
 }
 
 impl RaftState {
@@ -741,7 +779,7 @@ impl RaftState {
         DropGuard::new(move || self.persist())
     }
 
-    fn last_log_index_and_term(&self) -> (usize, Term) {
+    fn last_log_index_and_term(&self) -> (Index, Term) {
         let len = self.log.len();
         assert!(len > 0, "There should always be at least one entry in log");
         (len - 1, self.log.last().unwrap().term)