Ver Fonte

A clever trick to make Raft sync.

See the comments in SharedSender.
Jing Yang há 4 anos atrás
pai
commit
9d5b4fd65b
5 ficheiros alterados com 78 adições e 12 exclusões
  1. 7 7
      kvraft/src/server.rs
  2. 2 2
      src/election.rs
  3. 16 1
      src/lib.rs
  4. 3 2
      src/sync_log_entries.rs
  5. 50 0
      src/utils.rs

+ 7 - 7
kvraft/src/server.rs

@@ -20,7 +20,7 @@ use crate::snapshot_holder::SnapshotHolder;
 pub struct KVServer {
     me: AtomicUsize,
     state: Mutex<KVServerState>,
-    rf: Mutex<Raft<UniqueKVOp>>,
+    rf: Raft<UniqueKVOp>,
     keep_running: AtomicBool,
     logger: LocalLogger,
 }
@@ -109,14 +109,14 @@ impl KVServer {
         let ret = Arc::new(Self {
             me: AtomicUsize::new(me),
             state: Default::default(),
-            rf: Mutex::new(Raft::new(
+            rf: Raft::new(
                 servers,
                 me,
                 persister,
                 apply_command,
                 max_state_size_bytes,
                 move |index| snapshot_holder_clone.request_snapshot(index),
-            )),
+            ),
             keep_running: AtomicBool::new(true),
             logger: LocalLogger::inherit(),
         });
@@ -224,7 +224,7 @@ impl KVServer {
                             if let Some(snapshot) = snapshot_holder
                                 .take_snapshot(&this.state.lock(), index)
                             {
-                                this.rf.lock().save_snapshot(snapshot);
+                                this.rf.save_snapshot(snapshot);
                             }
                         }
                     }
@@ -268,7 +268,7 @@ impl KVServer {
             entry.clone()
         };
 
-        let (Term(hold_term), is_leader) = self.rf.lock().get_state();
+        let (Term(hold_term), is_leader) = self.rf.get_state();
         if !is_leader {
             result_holder.condvar.notify_all();
             return Err(CommitError::NotLeader);
@@ -307,7 +307,7 @@ impl KVServer {
                 me: self.me(),
                 unique_id,
             };
-            let start = self.rf.lock().start(op);
+            let start = self.rf.start(op);
             let start_term =
                 start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
                     Self::validate_term(term);
@@ -427,7 +427,7 @@ impl KVServer {
     }
 
     pub fn raft(&self) -> Raft<UniqueKVOp> {
-        self.rf.lock().clone()
+        self.rf.clone()
     }
 
     pub fn kill(self: Arc<Self>) {

+ 2 - 2
src/election.rs

@@ -7,7 +7,7 @@ use rand::{thread_rng, Rng};
 
 use crate::daemon_env::Daemon;
 use crate::term_marker::TermMarker;
-use crate::utils::{retry_rpc, RPC_DEADLINE};
+use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::{Peer, Raft, RaftState, RemoteRaft, RequestVoteArgs, State, Term};
 
 #[derive(Default)]
@@ -307,7 +307,7 @@ where
         votes: Vec<tokio::task::JoinHandle<Option<bool>>>,
         cancel_token: futures_channel::oneshot::Receiver<()>,
         election: Arc<ElectionState>,
-        new_log_entry: std::sync::mpsc::Sender<Option<Peer>>,
+        new_log_entry: SharedSender<Option<Peer>>,
     ) {
         let quorum = votes.len() >> 1;
         let mut vote_count = 0;

+ 16 - 1
src/lib.rs

@@ -62,7 +62,7 @@ pub struct Raft<Command> {
 
     persister: Arc<dyn Persister>,
 
-    new_log_entry: Option<std::sync::mpsc::Sender<Option<Peer>>>,
+    new_log_entry: Option<utils::SharedSender<Option<Peer>>>,
     apply_command_signal: Arc<Condvar>,
     keep_running: Arc<AtomicBool>,
     election: Arc<ElectionState>,
@@ -298,3 +298,18 @@ where
 }
 
 pub(crate) const HEARTBEAT_INTERVAL_MILLIS: u64 = 150;
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn test_raft_must_sync() {
+        let optional_raft: Option<super::Raft<i32>> = None;
+
+        fn must_sync<T: Sync>(value: T) {
+            drop(value)
+        }
+        must_sync(optional_raft)
+        // The following raft is not Sync.
+        // let optional_raft: Option<super::Raft<std::rc::Rc<i32>>> = None;
+    }
+}

+ 3 - 2
src/sync_log_entries.rs

@@ -8,7 +8,7 @@ use crate::check_or_record;
 use crate::daemon_env::{Daemon, ErrorKind};
 use crate::index_term::IndexTerm;
 use crate::term_marker::TermMarker;
-use crate::utils::{retry_rpc, RPC_DEADLINE};
+use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
 use crate::{
     AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
     Term, HEARTBEAT_INTERVAL_MILLIS,
@@ -58,6 +58,7 @@ where
     /// and backoff strategy.
     pub(crate) fn run_log_entry_daemon(&mut self) {
         let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
+        let tx = SharedSender::new(tx);
         self.new_log_entry.replace(tx);
 
         // Clone everything that the thread needs.
@@ -156,7 +157,7 @@ where
         rf: Arc<Mutex<RaftState<Command>>>,
         rpc_client: impl RemoteRaft<Command>,
         peer_index: usize,
-        rerun: std::sync::mpsc::Sender<Option<Peer>>,
+        rerun: SharedSender<Option<Peer>>,
         opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
         term_marker: TermMarker<Command>,

+ 50 - 0
src/utils.rs

@@ -79,3 +79,53 @@ pub mod integration_test {
         (reply.term, reply.success)
     }
 }
+
+/// A `std::sync::mpsc::Sender` that is also `Sync`.
+///
+/// The builtin `Sender` is not sync, because it uses internal mutability to
+/// implement an optimization for non-shared one-shot sending. The queue that
+/// backs the sender initially accepts only one item from a single producer.
+/// If the sender is cloned, the internal queue turns into a multi-producer
+/// multi-shot queue. After that, the internal mutability is never invoked
+/// again for the sender. The `Sender` structure becomes essentially immutable
+/// and thus, `Sync`.
+///
+/// This optimization, and the internal mutability is meaningless for the
+/// purpose of this crate. `SharedSender` forces the transition into a shared
+/// queue, and declares itself `Sync`.
+///
+/// Note that the same reasoning does not apply to the `Receiver`. There are
+/// more levels of mutability in the `Receiver`.
+#[derive(Clone, Debug)]
+pub struct SharedSender<T>(std::sync::mpsc::Sender<T>);
+
+unsafe impl<T> Sync for SharedSender<T> where T: Sync {}
+// A better way to implement this might be the following.
+//
+// unsafe impl<T> Sync for SharedSender<T> where
+//    std::sync::mpsc::Flavor<T>::Shared: Sync {}
+
+impl<T> SharedSender<T> {
+    /// Create a shared sender.
+    pub fn new(inner: std::sync::mpsc::Sender<T>) -> SharedSender<T> {
+        // Force the transition to a shared queue in Sender.
+        let _clone = inner.clone();
+        SharedSender(inner)
+    }
+
+    /// A proxy to `std::syc::mpsc::Sender::send()`.
+    pub fn send(&self, t: T) -> Result<(), std::sync::mpsc::SendError<T>> {
+        self.0.send(t)
+    }
+}
+
+impl<T> From<std::sync::mpsc::Sender<T>> for SharedSender<T> {
+    fn from(inner: std::sync::mpsc::Sender<T>) -> Self {
+        Self::new(inner)
+    }
+}
+impl<T> From<SharedSender<T>> for std::sync::mpsc::Sender<T> {
+    fn from(this: SharedSender<T>) -> Self {
+        this.0
+    }
+}