Browse Source

Move sync log daemon and dependencies to its own file.

Jing Yang 4 năm trước cách đây
mục cha
commit
5663d51ae7
3 tập tin đã thay đổi với 299 bổ sung276 xóa
  1. 2 2
      src/install_snapshot.rs
  2. 3 274
      src/lib.rs
  3. 294 0
      src/sync_log_entry.rs

+ 2 - 2
src/install_snapshot.rs

@@ -1,10 +1,10 @@
 use crate::check_or_record;
 use crate::daemon_env::ErrorKind;
 use crate::index_term::IndexTerm;
+use crate::sync_log_entry::SyncLogEntryResult;
 use crate::utils::retry_rpc;
 use crate::{
-    Index, Peer, Raft, RaftState, RpcClient, State, SyncLogEntryResult, Term,
-    RPC_DEADLINE,
+    Index, Peer, Raft, RaftState, RpcClient, State, Term, RPC_DEADLINE,
 };
 
 #[derive(Clone, Debug, Serialize, Deserialize)]

+ 3 - 274
src/lib.rs

@@ -8,7 +8,7 @@ extern crate serde_derive;
 extern crate tokio;
 
 use std::convert::TryFrom;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -17,10 +17,9 @@ use parking_lot::{Condvar, Mutex};
 
 use crate::apply_command::ApplyCommandFnMut;
 pub use crate::apply_command::ApplyCommandMessage;
-use crate::daemon_env::{DaemonEnv, ErrorKind, ThreadEnv};
+use crate::daemon_env::{DaemonEnv, ThreadEnv};
 use crate::election::ElectionState;
 use crate::index_term::IndexTerm;
-use crate::install_snapshot::InstallSnapshotArgs;
 use crate::persister::PersistedRaftState;
 pub use crate::persister::Persister;
 pub(crate) use crate::raft_state::RaftState;
@@ -28,7 +27,6 @@ pub(crate) use crate::raft_state::State;
 pub use crate::rpcs::RpcClient;
 pub use crate::snapshot::Snapshot;
 use crate::snapshot::{RequestSnapshotFnMut, SnapshotDaemon};
-use crate::utils::retry_rpc;
 
 mod apply_command;
 mod daemon_env;
@@ -41,6 +39,7 @@ mod persister;
 mod raft_state;
 pub mod rpcs;
 mod snapshot;
+mod sync_log_entry;
 pub mod utils;
 
 #[derive(
@@ -111,9 +110,6 @@ struct AppendEntriesReply {
     committed: Option<IndexTerm>,
 }
 
-#[repr(align(64))]
-struct Opening(Arc<AtomicUsize>);
-
 // Commands must be
 // 0. 'static: they have to live long enough for thread pools.
 // 1. clone: they are put in vectors and request messages.
@@ -338,19 +334,6 @@ where
     }
 }
 
-enum SyncLogEntryOperation<Command> {
-    AppendEntries(AppendEntriesArgs<Command>),
-    InstallSnapshot(InstallSnapshotArgs),
-    None,
-}
-
-enum SyncLogEntryResult {
-    TermElapsed(Term),
-    Archived(IndexTerm),
-    Diverged(IndexTerm),
-    Success,
-}
-
 // Command must be
 // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
 // 1. clone: they are copied to the persister.
@@ -361,260 +344,6 @@ impl<Command> Raft<Command>
 where
     Command: 'static + Clone + Send + serde::Serialize + Default,
 {
-    fn run_log_entry_daemon(&mut self) {
-        let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
-        self.new_log_entry.replace(tx);
-
-        // Clone everything that the thread needs.
-        let this = self.clone();
-        let join_handle = std::thread::spawn(move || {
-            // Note: do not change this to `let _ = ...`.
-            let _guard = this.daemon_env.for_scope();
-
-            let mut openings = vec![];
-            openings.resize_with(this.peers.len(), || {
-                Opening(Arc::new(AtomicUsize::new(0)))
-            });
-            let openings = openings; // Not mutable beyond this point.
-
-            while let Ok(peer) = rx.recv() {
-                if !this.keep_running.load(Ordering::SeqCst) {
-                    break;
-                }
-                if !this.inner_state.lock().is_leader() {
-                    continue;
-                }
-                for (i, rpc_client) in this.peers.iter().enumerate() {
-                    if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
-                    {
-                        // Only schedule a new task if the last task has cleared
-                        // the queue of RPC requests.
-                        if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
-                            this.thread_pool.spawn(Self::sync_log_entry(
-                                this.inner_state.clone(),
-                                rpc_client.clone(),
-                                i,
-                                this.new_log_entry.clone().unwrap(),
-                                openings[i].0.clone(),
-                                this.apply_command_signal.clone(),
-                            ));
-                        }
-                    }
-                }
-            }
-
-            let stop_wait_group = this.stop_wait_group.clone();
-            // Making sure the rest of `this` is dropped before the wait group.
-            drop(this);
-            drop(stop_wait_group);
-        });
-        self.daemon_env.watch_daemon(join_handle);
-    }
-
-    async fn sync_log_entry(
-        rf: Arc<Mutex<RaftState<Command>>>,
-        rpc_client: Arc<RpcClient>,
-        peer_index: usize,
-        rerun: std::sync::mpsc::Sender<Option<Peer>>,
-        opening: Arc<AtomicUsize>,
-        apply_command_signal: Arc<Condvar>,
-    ) {
-        if opening.swap(0, Ordering::SeqCst) == 0 {
-            return;
-        }
-
-        let operation = Self::build_sync_log_entry(&rf, peer_index);
-        let (term, prev_log_index, match_index, succeeded) = match operation {
-            SyncLogEntryOperation::AppendEntries(args) => {
-                let term = args.term;
-                let prev_log_index = args.prev_log_index;
-                let match_index = args.prev_log_index + args.entries.len();
-                let succeeded = Self::append_entries(&rpc_client, args).await;
-
-                (term, prev_log_index, match_index, succeeded)
-            }
-            SyncLogEntryOperation::InstallSnapshot(args) => {
-                let term = args.term;
-                let prev_log_index = args.last_included_index;
-                let match_index = args.last_included_index;
-                let succeeded =
-                    Self::send_install_snapshot(&rpc_client, args).await;
-
-                (term, prev_log_index, match_index, succeeded)
-            }
-            SyncLogEntryOperation::None => return,
-        };
-
-        let peer = Peer(peer_index);
-        match succeeded {
-            Ok(SyncLogEntryResult::Success) => {
-                let mut rf = rf.lock();
-
-                if rf.current_term != term {
-                    return;
-                }
-
-                rf.next_index[peer_index] = match_index + 1;
-                rf.current_step[peer_index] = 0;
-                if match_index > rf.match_index[peer_index] {
-                    rf.match_index[peer_index] = match_index;
-                    if rf.is_leader() && rf.current_term == term {
-                        let mut matched = rf.match_index.to_vec();
-                        let mid = matched.len() / 2 + 1;
-                        matched.sort_unstable();
-                        let new_commit_index = matched[mid];
-                        if new_commit_index > rf.commit_index
-                            && rf.log[new_commit_index].term == rf.current_term
-                        {
-                            rf.commit_index = new_commit_index;
-                            apply_command_signal.notify_one();
-                        }
-                    }
-                }
-            }
-            Ok(SyncLogEntryResult::Archived(committed)) => {
-                if prev_log_index >= committed.index {
-                    eprintln!(
-                        "Peer {} misbehaves: send prev log index {}, got committed {:?}",
-                        peer_index, prev_log_index, committed
-                    );
-                }
-
-                let mut rf = rf.lock();
-                Self::check_committed(&rf, peer, committed.clone());
-
-                rf.current_step[peer_index] = 0;
-                rf.next_index[peer_index] = committed.index;
-
-                // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
-            }
-            Ok(SyncLogEntryResult::Diverged(committed)) => {
-                if prev_log_index < committed.index {
-                    eprintln!(
-                        "Peer {} misbehaves: diverged at {}, but committed {:?}",
-                        peer_index, prev_log_index, committed
-                    );
-                }
-                let mut rf = rf.lock();
-                Self::check_committed(&rf, peer, committed.clone());
-
-                let step = &mut rf.current_step[peer_index];
-                if *step < 5 {
-                    *step += 1;
-                }
-                let diff = 4 << *step;
-
-                let next_index = &mut rf.next_index[peer_index];
-                if diff >= *next_index {
-                    *next_index = 1usize;
-                } else {
-                    *next_index -= diff;
-                }
-
-                if *next_index < committed.index {
-                    *next_index = committed.index;
-                }
-
-                // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
-            }
-            // Do nothing, not our term anymore.
-            Ok(SyncLogEntryResult::TermElapsed(_)) => {}
-            Err(_) => {
-                tokio::time::sleep(Duration::from_millis(
-                    HEARTBEAT_INTERVAL_MILLIS,
-                ))
-                .await;
-                // Ignore the error. The log syncing thread must have died.
-                let _ = rerun.send(Some(Peer(peer_index)));
-            }
-        };
-    }
-
-    fn check_committed(
-        rf: &RaftState<Command>,
-        peer: Peer,
-        committed: IndexTerm,
-    ) {
-        if committed.index < rf.log.start() {
-            return;
-        }
-        let local_term = rf.log.at(committed.index).term;
-        if committed.term != local_term {
-            eprintln!(
-                "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
-                peer, committed.index, committed.term, local_term
-            );
-        }
-    }
-
-    fn build_sync_log_entry(
-        rf: &Mutex<RaftState<Command>>,
-        peer_index: usize,
-    ) -> SyncLogEntryOperation<Command> {
-        let rf = rf.lock();
-        if !rf.is_leader() {
-            return SyncLogEntryOperation::None;
-        }
-
-        // To send AppendEntries request, next_index must be strictly larger
-        // than start(). Otherwise we won't be able to know the log term of the
-        // entry right before next_index.
-        if rf.next_index[peer_index] > rf.log.start() {
-            SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
-                &rf, peer_index,
-            ))
-        } else {
-            SyncLogEntryOperation::InstallSnapshot(
-                Self::build_install_snapshot(&rf),
-            )
-        }
-    }
-
-    fn build_append_entries(
-        rf: &RaftState<Command>,
-        peer_index: usize,
-    ) -> AppendEntriesArgs<Command> {
-        let prev_log_index = rf.next_index[peer_index] - 1;
-        let prev_log_term = rf.log[prev_log_index].term;
-        AppendEntriesArgs {
-            term: rf.current_term,
-            leader_id: rf.leader_id,
-            prev_log_index,
-            prev_log_term,
-            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
-            leader_commit: rf.commit_index,
-        }
-    }
-
-    const APPEND_ENTRIES_RETRY: usize = 1;
-    async fn append_entries(
-        rpc_client: &RpcClient,
-        args: AppendEntriesArgs<Command>,
-    ) -> std::io::Result<SyncLogEntryResult> {
-        let term = args.term;
-        let reply = retry_rpc(
-            Self::APPEND_ENTRIES_RETRY,
-            RPC_DEADLINE,
-            move |_round| rpc_client.call_append_entries(args.clone()),
-        )
-        .await?;
-        Ok(if reply.term == term {
-            if let Some(committed) = reply.committed {
-                if reply.success {
-                    SyncLogEntryResult::Archived(committed)
-                } else {
-                    SyncLogEntryResult::Diverged(committed)
-                }
-            } else {
-                SyncLogEntryResult::Success
-            }
-        } else {
-            SyncLogEntryResult::TermElapsed(reply.term)
-        })
-    }
-
     pub fn start(&self, command: Command) -> Option<(Term, Index)> {
         let mut rf = self.inner_state.lock();
         let term = rf.current_term;

+ 294 - 0
src/sync_log_entry.rs

@@ -0,0 +1,294 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+
+use parking_lot::{Condvar, Mutex};
+
+use crate::index_term::IndexTerm;
+use crate::install_snapshot::InstallSnapshotArgs;
+use crate::utils::retry_rpc;
+use crate::{
+    AppendEntriesArgs, Peer, Raft, RaftState, RpcClient, Term,
+    HEARTBEAT_INTERVAL_MILLIS, RPC_DEADLINE,
+};
+
+#[repr(align(64))]
+struct Opening(Arc<AtomicUsize>);
+
+enum SyncLogEntryOperation<Command> {
+    AppendEntries(AppendEntriesArgs<Command>),
+    InstallSnapshot(InstallSnapshotArgs),
+    None,
+}
+
+enum SyncLogEntryResult {
+    TermElapsed(Term),
+    Archived(IndexTerm),
+    Diverged(IndexTerm),
+    Success,
+}
+
+// Command must be
+// 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
+// 1. clone: they are copied to the persister.
+// 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
+// 3. serialize: they are converted to bytes to persist.
+// 4. default: a default value is used as the first element of log.
+impl<Command> Raft<Command>
+where
+    Command: 'static + Clone + Send + serde::Serialize + Default,
+{
+    pub(crate) fn run_log_entry_daemon(&mut self) {
+        let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+        self.new_log_entry.replace(tx);
+
+        // Clone everything that the thread needs.
+        let this = self.clone();
+        let join_handle = std::thread::spawn(move || {
+            // Note: do not change this to `let _ = ...`.
+            let _guard = this.daemon_env.for_scope();
+
+            let mut openings = vec![];
+            openings.resize_with(this.peers.len(), || {
+                Opening(Arc::new(AtomicUsize::new(0)))
+            });
+            let openings = openings; // Not mutable beyond this point.
+
+            while let Ok(peer) = rx.recv() {
+                if !this.keep_running.load(Ordering::SeqCst) {
+                    break;
+                }
+                if !this.inner_state.lock().is_leader() {
+                    continue;
+                }
+                for (i, rpc_client) in this.peers.iter().enumerate() {
+                    if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
+                    {
+                        // Only schedule a new task if the last task has cleared
+                        // the queue of RPC requests.
+                        if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
+                            this.thread_pool.spawn(Self::sync_log_entry(
+                                this.inner_state.clone(),
+                                rpc_client.clone(),
+                                i,
+                                this.new_log_entry.clone().unwrap(),
+                                openings[i].0.clone(),
+                                this.apply_command_signal.clone(),
+                            ));
+                        }
+                    }
+                }
+            }
+
+            let stop_wait_group = this.stop_wait_group.clone();
+            // Making sure the rest of `this` is dropped before the wait group.
+            drop(this);
+            drop(stop_wait_group);
+        });
+        self.daemon_env.watch_daemon(join_handle);
+    }
+
+    async fn sync_log_entry(
+        rf: Arc<Mutex<RaftState<Command>>>,
+        rpc_client: Arc<RpcClient>,
+        peer_index: usize,
+        rerun: std::sync::mpsc::Sender<Option<Peer>>,
+        opening: Arc<AtomicUsize>,
+        apply_command_signal: Arc<Condvar>,
+    ) {
+        if opening.swap(0, Ordering::SeqCst) == 0 {
+            return;
+        }
+
+        let operation = Self::build_sync_log_entry(&rf, peer_index);
+        let (term, prev_log_index, match_index, succeeded) = match operation {
+            SyncLogEntryOperation::AppendEntries(args) => {
+                let term = args.term;
+                let prev_log_index = args.prev_log_index;
+                let match_index = args.prev_log_index + args.entries.len();
+                let succeeded = Self::append_entries(&rpc_client, args).await;
+
+                (term, prev_log_index, match_index, succeeded)
+            }
+            SyncLogEntryOperation::InstallSnapshot(args) => {
+                let term = args.term;
+                let prev_log_index = args.last_included_index;
+                let match_index = args.last_included_index;
+                let succeeded =
+                    Self::send_install_snapshot(&rpc_client, args).await;
+
+                (term, prev_log_index, match_index, succeeded)
+            }
+            SyncLogEntryOperation::None => return,
+        };
+
+        let peer = Peer(peer_index);
+        match succeeded {
+            Ok(SyncLogEntryResult::Success) => {
+                let mut rf = rf.lock();
+
+                if rf.current_term != term {
+                    return;
+                }
+
+                rf.next_index[peer_index] = match_index + 1;
+                rf.current_step[peer_index] = 0;
+                if match_index > rf.match_index[peer_index] {
+                    rf.match_index[peer_index] = match_index;
+                    if rf.is_leader() && rf.current_term == term {
+                        let mut matched = rf.match_index.to_vec();
+                        let mid = matched.len() / 2 + 1;
+                        matched.sort_unstable();
+                        let new_commit_index = matched[mid];
+                        if new_commit_index > rf.commit_index
+                            && rf.log[new_commit_index].term == rf.current_term
+                        {
+                            rf.commit_index = new_commit_index;
+                            apply_command_signal.notify_one();
+                        }
+                    }
+                }
+            }
+            Ok(SyncLogEntryResult::Archived(committed)) => {
+                if prev_log_index >= committed.index {
+                    eprintln!(
+                        "Peer {} misbehaves: send prev log index {}, got committed {:?}",
+                        peer_index, prev_log_index, committed
+                    );
+                }
+
+                let mut rf = rf.lock();
+                Self::check_committed(&rf, peer, committed.clone());
+
+                rf.current_step[peer_index] = 0;
+                rf.next_index[peer_index] = committed.index;
+
+                // Ignore the error. The log syncing thread must have died.
+                let _ = rerun.send(Some(Peer(peer_index)));
+            }
+            Ok(SyncLogEntryResult::Diverged(committed)) => {
+                if prev_log_index < committed.index {
+                    eprintln!(
+                        "Peer {} misbehaves: diverged at {}, but committed {:?}",
+                        peer_index, prev_log_index, committed
+                    );
+                }
+                let mut rf = rf.lock();
+                Self::check_committed(&rf, peer, committed.clone());
+
+                let step = &mut rf.current_step[peer_index];
+                if *step < 5 {
+                    *step += 1;
+                }
+                let diff = 4 << *step;
+
+                let next_index = &mut rf.next_index[peer_index];
+                if diff >= *next_index {
+                    *next_index = 1usize;
+                } else {
+                    *next_index -= diff;
+                }
+
+                if *next_index < committed.index {
+                    *next_index = committed.index;
+                }
+
+                // Ignore the error. The log syncing thread must have died.
+                let _ = rerun.send(Some(Peer(peer_index)));
+            }
+            // Do nothing, not our term anymore.
+            Ok(SyncLogEntryResult::TermElapsed(_)) => {}
+            Err(_) => {
+                tokio::time::sleep(Duration::from_millis(
+                    HEARTBEAT_INTERVAL_MILLIS,
+                ))
+                .await;
+                // Ignore the error. The log syncing thread must have died.
+                let _ = rerun.send(Some(Peer(peer_index)));
+            }
+        };
+    }
+
+    fn check_committed(
+        rf: &RaftState<Command>,
+        peer: Peer,
+        committed: IndexTerm,
+    ) {
+        if committed.index < rf.log.start() {
+            return;
+        }
+        let local_term = rf.log.at(committed.index).term;
+        if committed.term != local_term {
+            eprintln!(
+                "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
+                peer, committed.index, committed.term, local_term
+            );
+        }
+    }
+
+    fn build_sync_log_entry(
+        rf: &Mutex<RaftState<Command>>,
+        peer_index: usize,
+    ) -> SyncLogEntryOperation<Command> {
+        let rf = rf.lock();
+        if !rf.is_leader() {
+            return SyncLogEntryOperation::None;
+        }
+
+        // To send AppendEntries request, next_index must be strictly larger
+        // than start(). Otherwise we won't be able to know the log term of the
+        // entry right before next_index.
+        if rf.next_index[peer_index] > rf.log.start() {
+            SyncLogEntryOperation::AppendEntries(Self::build_append_entries(
+                &rf, peer_index,
+            ))
+        } else {
+            SyncLogEntryOperation::InstallSnapshot(
+                Self::build_install_snapshot(&rf),
+            )
+        }
+    }
+
+    fn build_append_entries(
+        rf: &RaftState<Command>,
+        peer_index: usize,
+    ) -> AppendEntriesArgs<Command> {
+        let prev_log_index = rf.next_index[peer_index] - 1;
+        let prev_log_term = rf.log[prev_log_index].term;
+        AppendEntriesArgs {
+            term: rf.current_term,
+            leader_id: rf.leader_id,
+            prev_log_index,
+            prev_log_term,
+            entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
+            leader_commit: rf.commit_index,
+        }
+    }
+
+    const APPEND_ENTRIES_RETRY: usize = 1;
+    async fn append_entries(
+        rpc_client: &RpcClient,
+        args: AppendEntriesArgs<Command>,
+    ) -> std::io::Result<SyncLogEntryResult> {
+        let term = args.term;
+        let reply = retry_rpc(
+            Self::APPEND_ENTRIES_RETRY,
+            RPC_DEADLINE,
+            move |_round| rpc_client.call_append_entries(args.clone()),
+        )
+        .await?;
+        Ok(if reply.term == term {
+            if let Some(committed) = reply.committed {
+                if reply.success {
+                    SyncLogEntryResult::Archived(committed)
+                } else {
+                    SyncLogEntryResult::Diverged(committed)
+                }
+            } else {
+                SyncLogEntryResult::Success
+            }
+        } else {
+            SyncLogEntryResult::TermElapsed(reply.term)
+        })
+    }
+}