Przeglądaj źródła

Replace SharedSender with the std Sender, which is now Sync.

Jing Yang 2 lat temu
rodzic
commit
9a192c07c1

+ 2 - 3
raft/src/sync_log_entries.rs

@@ -7,7 +7,7 @@ use crate::daemon_env::ErrorKind;
 use crate::heartbeats::HEARTBEAT_INTERVAL;
 use crate::peer_progress::PeerProgress;
 use crate::remote::RemoteContext;
-use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
+use crate::utils::{retry_rpc, RPC_DEADLINE};
 use crate::{
     check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
     Peer, Raft, RaftState, ReplicableCommand, Term,
@@ -34,7 +34,7 @@ impl Event {
 
 #[derive(Clone)]
 pub(crate) struct SyncLogEntriesComms {
-    tx: SharedSender<Event>,
+    tx: std::sync::mpsc::Sender<Event>,
 }
 
 impl SyncLogEntriesComms {
@@ -68,7 +68,6 @@ pub(crate) fn create(
     peer_size: usize,
 ) -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
     let (tx, rx) = std::sync::mpsc::channel();
-    let tx = SharedSender::new(tx);
     let peer_progress = (0..peer_size).map(PeerProgress::create).collect();
     (
         SyncLogEntriesComms { tx },

+ 0 - 2
raft/src/utils/mod.rs

@@ -1,7 +1,5 @@
 pub use rpcs::{retry_rpc, RPC_DEADLINE};
-pub use shared_sender::SharedSender;
 
 pub mod do_nothing;
 pub mod integration_test;
 mod rpcs;
-mod shared_sender;

+ 0 - 56
raft/src/utils/shared_sender.rs

@@ -1,56 +0,0 @@
-/// A `std::sync::mpsc::Sender` that is also `Sync`.
-///
-/// The builtin `Sender` is not sync, because it uses internal mutability to
-/// implement an optimization for non-shared one-shot sending. The queue that
-/// backs the sender initially accepts only one item from a single producer.
-/// If the sender is cloned, the internal queue turns into a multi-producer
-/// multi-shot queue. After that, the internal mutability is never invoked
-/// again for the sender. The `Sender` structure becomes essentially immutable
-/// and thus, `Sync`.
-///
-/// This optimization, and the internal mutability is meaningless for the
-/// purpose of this crate. `SharedSender` forces the transition into a shared
-/// queue, and declares itself `Sync`.
-///
-/// Note that the same reasoning does not apply to the `Receiver`. There are
-/// more levels of mutability in the `Receiver`.
-#[derive(Debug)]
-pub struct SharedSender<T>(std::sync::mpsc::Sender<T>);
-
-unsafe impl<T> Sync for SharedSender<T> where T: Sync {}
-// A better way to implement this might be the following.
-//
-// unsafe impl<T> Sync for SharedSender<T> where
-//    std::sync::mpsc::Flavor<T>::Shared: Sync {}
-
-impl<T> SharedSender<T> {
-    /// Create a shared sender.
-    pub fn new(inner: std::sync::mpsc::Sender<T>) -> SharedSender<T> {
-        // Force the transition to a shared queue in Sender.
-        let _clone = inner.clone();
-        SharedSender(inner)
-    }
-
-    /// A proxy to `std::syc::mpsc::Sender::send()`.
-    pub fn send(&self, t: T) -> Result<(), std::sync::mpsc::SendError<T>> {
-        self.0.send(t)
-    }
-}
-
-impl<T> From<std::sync::mpsc::Sender<T>> for SharedSender<T> {
-    fn from(inner: std::sync::mpsc::Sender<T>) -> Self {
-        Self::new(inner)
-    }
-}
-
-impl<T> From<SharedSender<T>> for std::sync::mpsc::Sender<T> {
-    fn from(this: SharedSender<T>) -> Self {
-        this.0
-    }
-}
-
-impl<T> Clone for SharedSender<T> {
-    fn clone(&self) -> Self {
-        Self(self.0.clone())
-    }
-}