瀏覽代碼

Implement the initialization logic.

Jing Yang 5 年之前
父節點
當前提交
297ef1a2cf
共有 2 個文件被更改,包括 97 次插入60 次删除
  1. 84 52
      src/lib.rs
  2. 13 8
      src/rpcs.rs

+ 84 - 52
src/lib.rs

@@ -1,5 +1,3 @@
-#![allow(unused)]
-
 extern crate bincode;
 extern crate futures;
 extern crate labrpc;
@@ -15,7 +13,7 @@ use std::time::Duration;
 use parking_lot::{Condvar, Mutex};
 use rand::{thread_rng, Rng};
 
-use crate::rpcs::RpcClient;
+pub use crate::rpcs::RpcClient;
 use crate::utils::{retry_rpc, DropGuard};
 
 pub mod rpcs;
@@ -37,24 +35,7 @@ struct Term(usize);
 struct Peer(usize);
 
 #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
-struct Command(usize);
-
-// TODO: remove all of the defaults.
-impl Default for State {
-    fn default() -> Self {
-        Self::Leader
-    }
-}
-impl Default for Term {
-    fn default() -> Self {
-        Self(0)
-    }
-}
-impl Default for Peer {
-    fn default() -> Self {
-        Self(0)
-    }
-}
+pub struct Command(usize);
 
 #[derive(Clone, Copy, Serialize, Deserialize)]
 struct LogEntry {
@@ -64,7 +45,6 @@ struct LogEntry {
     command: Command,
 }
 
-#[derive(Default)]
 struct RaftState {
     current_term: Term,
     voted_for: Option<Peer>,
@@ -87,8 +67,8 @@ struct RaftState {
     election_timer: Option<tokio::time::Delay>,
 }
 
-#[derive(Default)]
-struct Raft {
+#[derive(Clone)]
+pub struct Raft {
     inner_state: Arc<Mutex<RaftState>>,
     peers: Vec<RpcClient>,
 
@@ -97,7 +77,8 @@ struct Raft {
     new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
-    // applyCh: Sender<ApplyMsg>
+
+    thread_pool: Arc<tokio::runtime::Runtime>,
 }
 
 #[derive(Clone, Serialize, Deserialize)]
@@ -131,16 +112,70 @@ struct AppendEntriesReply {
 }
 
 impl Raft {
-    pub fn new() -> Self {
-        let raft = Self {
-            ..Default::default()
+    pub fn new<Func>(
+        peers: Vec<RpcClient>,
+        me: usize,
+        apply_command: Func,
+    ) -> Self
+    where
+        Func: 'static + Send + FnMut(usize, Command) -> (),
+    {
+        let peer_size = peers.len();
+        let state = RaftState {
+            current_term: Term(0),
+            voted_for: None,
+            log: vec![LogEntry {
+                term: Term(0),
+                index: 0,
+                command: Command(0),
+            }],
+            commit_index: 0,
+            last_applied: 0,
+            next_index: Vec::with_capacity(peer_size),
+            match_index: Vec::with_capacity(peer_size),
+            current_step: Vec::with_capacity(peer_size),
+            state: State::Follower,
+            leader_id: Peer(me),
+            election_cancel_token: None,
+            election_timer: None
         };
-        raft.inner_state.lock().log.push(LogEntry {
-            term: Default::default(),
-            index: 0,
-            command: Command(0),
+        let thread_pool = tokio::runtime::Builder::new()
+            .threaded_scheduler()
+            .enable_time()
+            .thread_name(format!("raft-instance-{}", me))
+            .core_threads(peer_size)
+            .max_threads(peer_size * 2)
+            .build()
+            .expect("Creating thread pool should not fail");
+        let mut this = Raft {
+            inner_state: Arc::new(Mutex::new(state)),
+            peers,
+            me: Peer(me),
+            new_log_entry: None,
+            apply_command_signal: Arc::new(Default::default()),
+            keep_running: Arc::new(Default::default()),
+            thread_pool: Arc::new(thread_pool),
+        };
+
+        // TODO: election timer.
+        // TODO: read persist.
+        this.keep_running.store(true, Ordering::SeqCst);
+        // Running in a standalone thread.
+        this.run_log_entry_daemon();
+        // Running in a standalone thread.
+        this.run_apply_command_daemon(apply_command);
+        // One off function that schedules many little tasks, running on the
+        // internal thread pool.
+        this.schedule_heartbeats(Duration::from_millis(
+            HEARTBEAT_INTERVAL_MILLIS,
+        ));
+
+        // The last step is to start running elections.
+        let election_handle = this.clone();
+        this.thread_pool.spawn(async move {
+            election_handle.run_election();
         });
-        raft
+        this
     }
 
     pub(crate) fn process_request_vote(
@@ -287,15 +322,16 @@ impl Raft {
                 // across threads.
                 let rpc_client = rpc_client.clone();
                 // RPCs are started right away.
-                let one_vote =
-                    tokio::spawn(Self::request_vote(rpc_client, args.clone()));
+                let one_vote = self
+                    .thread_pool
+                    .spawn(Self::request_vote(rpc_client, args.clone()));
                 // Futures must be pinned so that they have Unpin, as required
                 // by futures::future::select.
                 votes.push(one_vote);
             }
         }
 
-        tokio::spawn(Self::count_vote_util_cancelled(
+        self.thread_pool.spawn(Self::count_vote_util_cancelled(
             term,
             self.inner_state.clone(),
             votes,
@@ -395,7 +431,7 @@ impl Raft {
                 let rpc_client = rpc_client.clone();
                 // Shutdown signal.
                 let keep_running = self.keep_running.clone();
-                tokio::spawn(async move {
+                self.thread_pool.spawn(async move {
                     loop {
                         if !keep_running.load(Ordering::SeqCst) {
                             break;
@@ -451,28 +487,24 @@ impl Raft {
         self.new_log_entry.replace(tx.clone());
 
         // Clone everything that the thread needs.
-        let rerun = tx;
-        let peers = self.peers.clone();
-        let rf = self.inner_state.clone();
-        let me = self.me;
-        let keep_running = self.keep_running.clone();
-        let apply_command_signal = self.apply_command_signal.clone();
+        let this = self.clone();
         let handle = std::thread::spawn(move || {
             while let Ok(peer) = rx.recv() {
-                for (i, rpc_client) in peers.iter().enumerate() {
-                    if i != me.0 && peer.map(|p| p.0 == i).unwrap_or(true) {
-                        tokio::spawn(Self::sync_log_entry(
-                            rf.clone(),
+                if !this.keep_running.load(Ordering::SeqCst) {
+                    break;
+                }
+                for (i, rpc_client) in this.peers.iter().enumerate() {
+                    if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
+                    {
+                        this.thread_pool.spawn(Self::sync_log_entry(
+                            this.inner_state.clone(),
                             rpc_client.clone(),
                             i,
-                            rerun.clone(),
-                            apply_command_signal.clone(),
+                            this.new_log_entry.clone().unwrap(),
+                            this.apply_command_signal.clone(),
                         ));
                     }
                 }
-                if !keep_running.load(Ordering::SeqCst) {
-                    break;
-                }
             }
         });
 

+ 13 - 8
src/rpcs.rs

@@ -44,7 +44,7 @@ pub(crate) const REQUEST_VOTE_RPC: &'static str = "Raft.RequestVote";
 pub(crate) const APPEND_ENTRIES_RPC: &'static str = "Raft.AppendEntries";
 
 #[derive(Clone)]
-pub(crate) struct RpcClient(Client);
+pub struct RpcClient(Client);
 
 impl RpcClient {
     pub(crate) async fn call_request_vote(
@@ -109,20 +109,25 @@ pub(crate) fn register_server<S: AsRef<str>>(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::Term;
+    use crate::{Peer, Term};
 
     #[test]
     fn test_basic_message() -> std::io::Result<()> {
         let client = {
             let network = Network::run_daemon();
-            let raft = Arc::new(Raft::new());
             let name = "test-basic-message";
 
-            register_server(raft, name, network.clone())?;
             let client = network
                 .lock()
                 .expect("Network lock should not be poisoned")
                 .make_client("test-basic-message", name.to_owned() + "-server");
+
+            let raft = Arc::new(Raft::new(
+                vec![RpcClient(client.clone())],
+                0,
+                |_, _| {},
+            ));
+            register_server(raft, name, network.clone())?;
             client
         };
 
@@ -130,9 +135,9 @@ mod tests {
         let request = RequestVoteArgs {
             term: Term(2021),
 
-            candidate_id: Default::default(),
+            candidate_id: Peer(0),
             last_log_index: 0,
-            last_log_term: Default::default(),
+            last_log_term: Term(0),
         };
         let response = futures::executor::block_on(
             rpc_client.clone().call_request_vote(request),
@@ -141,9 +146,9 @@ mod tests {
 
         let request = AppendEntriesArgs {
             term: Term(2021),
-            leader_id: Default::default(),
+            leader_id: Peer(0),
             prev_log_index: 0,
-            prev_log_term: Default::default(),
+            prev_log_term: Term(0),
             entries: vec![],
             leader_commit: 0,
         };