Преглед изворни кода

Move to crossbeam channel.

So that client does not need to be cloned before sending anything.
Jing Yang пре 5 година
родитељ
комит
4dda408a21
4 измењених фајлова са 20 додато и 31 уклоњено
  1. 1 0
      Cargo.toml
  2. 11 16
      src/client.rs
  3. 1 0
      src/lib.rs
  4. 7 15
      src/network.rs

+ 1 - 0
Cargo.toml

@@ -8,6 +8,7 @@ license = "MIT"
 
 [dependencies]
 bytes = "0.5.6"
+crossbeam-channel = "0.5.0"
 futures = { version = "0.3.5", features = ["thread-pool"] }
 parking_lot = "0.11.0"
 rand = "0.7.3"

+ 11 - 16
src/client.rs

@@ -1,4 +1,4 @@
-use std::sync::mpsc::Sender;
+use crossbeam_channel::Sender;
 
 use crate::{
     ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
@@ -6,8 +6,6 @@ use crate::{
 };
 
 // Client interface, used by the RPC client.
-// Clone this interface when before calling `call_rpc`.
-#[derive(Clone)]
 pub struct Client {
     pub(crate) client: ClientIdentifier,
     pub(crate) server: ServerIdentifier,
@@ -29,28 +27,26 @@ impl Client {
     /// * Connection aborted: The client will not receive a reply because the
     /// the connection is closed by the network.
     pub async fn call_rpc(
-        self,
+        &self,
         service_method: String,
         request: RequestMessage,
     ) -> Result<ReplyMessage> {
         let (tx, rx) = futures::channel::oneshot::channel();
-        let (server, client, request_bus) =
-            (self.server, self.client, self.request_bus);
         let rpc = RpcOnWire {
-            client: client.clone(),
-            server,
+            client: self.client.clone(),
+            server: self.server.clone(),
             service_method,
             request,
             reply_channel: tx,
         };
 
-        request_bus.send(rpc).map_err(|e| {
+        self.request_bus.send(rpc).map_err(|e| {
             // The receiving end has been closed. Network connection is broken.
             std::io::Error::new(
                 std::io::ErrorKind::NotConnected,
                 format!(
                     "Cannot send rpc, client {} is disconnected. {}",
-                    client, e
+                    self.client, e
                 ),
             )
         })?;
@@ -68,7 +64,7 @@ impl Client {
 
 #[cfg(test)]
 mod tests {
-    use std::sync::mpsc::{channel, Sender};
+    use crossbeam_channel::{unbounded, Sender};
 
     use super::*;
 
@@ -86,7 +82,7 @@ mod tests {
     fn make_rpc_call_and_reply(
         reply: Result<ReplyMessage>,
     ) -> Result<ReplyMessage> {
-        let (tx, rx) = channel();
+        let (tx, rx) = unbounded();
 
         let handle = std::thread::spawn(move || make_rpc_call(tx));
 
@@ -130,7 +126,7 @@ mod tests {
 
     #[test]
     fn test_call_rpc_remote_dropped() -> Result<()> {
-        let (tx, rx) = channel();
+        let (tx, rx) = unbounded();
 
         let handle = std::thread::spawn(move || make_rpc_call(tx));
 
@@ -152,7 +148,7 @@ mod tests {
 
     #[test]
     fn test_call_rpc_not_connected() -> Result<()> {
-        let (tx, rx) = channel();
+        let (tx, rx) = unbounded();
         {
             drop(rx);
         }
@@ -169,13 +165,12 @@ mod tests {
 
     async fn make_rpc(client: Client) -> Result<ReplyMessage> {
         let request = RequestMessage::from_static(&[0x17, 0x20]);
-        let client = client.clone();
         client.call_rpc("hello".into(), request).await
     }
 
     #[test]
     fn test_call_across_threads() -> Result<()> {
-        let (tx, rx) = channel();
+        let (tx, rx) = unbounded();
         let rpc_future = {
             let client = Client {
                 client: "C".into(),

+ 1 - 0
src/lib.rs

@@ -1,4 +1,5 @@
 extern crate bytes;
+extern crate crossbeam_channel;
 extern crate futures;
 extern crate rand;
 extern crate tokio;

+ 7 - 15
src/network.rs

@@ -1,11 +1,11 @@
 use std::collections::HashMap;
-use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
 use std::sync::{
     atomic::{AtomicBool, Ordering},
     Arc,
 };
 use std::time::{Duration, Instant};
 
+use crossbeam_channel::{Receiver, Sender, TryRecvError};
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
@@ -314,7 +314,7 @@ impl Network {
     fn new() -> Self {
         // The channel has infinite buffer, could OOM the server if there are
         // too many pending RPCs to be served.
-        let (tx, rx) = channel();
+        let (tx, rx) = crossbeam_channel::unbounded();
         Network {
             reliable: true,
             long_delays: false,
@@ -543,9 +543,7 @@ mod tests {
 
         // Send first request.
         let reply = futures::executor::block_on(
-            client
-                .clone()
-                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
         )?;
         assert_eq!(reply_data, reply.as_ref());
         assert_eq!(1, unlock(&network).get_total_rpc_count());
@@ -555,9 +553,7 @@ mod tests {
 
         // Send second request.
         let reply = futures::executor::block_on(
-            client
-                .clone()
-                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Client is blocked");
         assert_eq!(2, unlock(&network).get_total_rpc_count());
@@ -570,9 +566,7 @@ mod tests {
 
         // Send third request.
         let reply = futures::executor::block_on(
-            client
-                .clone()
-                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Client is blocked");
         assert_eq!(3, unlock(&network).get_total_rpc_count());
@@ -586,9 +580,7 @@ mod tests {
 
         // Send forth request.
         let reply = futures::executor::block_on(
-            client
-                .clone()
-                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Network is shutdown");
         assert_eq!(3, unlock(&network).get_total_rpc_count());
@@ -619,7 +611,7 @@ mod tests {
 
                 let mut results = vec![];
                 for _ in 0..RPC_COUNT {
-                    let reply = client.clone().call_rpc(
+                    let reply = client.call_rpc(
                         JunkRpcs::Echo.name(),
                         RequestMessage::from_static(&[0x20, 0x17]),
                     );