Sfoglia il codice sorgente

Require client to be cloned before sending RPCs.

The reason being that the mspc sender is not Sync, and thus could
not be accessed by more than one thread at the same time.

There seems to be minimal impact on running time.
Jing Yang 5 anni fa
parent
commit
96f6b2da46
2 ha cambiato i file con 49 aggiunte e 11 eliminazioni
  1. 36 6
      src/client.rs
  2. 13 5
      src/network.rs

+ 36 - 6
src/client.rs

@@ -6,6 +6,8 @@ use crate::{
 };
 
 // Client interface, used by the RPC client.
+// Clone this interface when before calling `call_rpc`.
+#[derive(Clone)]
 pub struct Client {
     pub(crate) client: ClientIdentifier,
     pub(crate) server: ServerIdentifier,
@@ -27,27 +29,28 @@ impl Client {
     /// * Connection aborted: The client will not receive a reply because the
     /// the connection is closed by the network.
     pub async fn call_rpc(
-        &self,
+        self,
         service_method: String,
         request: RequestMessage,
     ) -> Result<ReplyMessage> {
         let (tx, rx) = futures::channel::oneshot::channel();
+        let (server, client, request_bus) =
+            (self.server, self.client, self.request_bus);
         let rpc = RpcOnWire {
-            client: self.client.clone(),
-            server: self.server.clone(),
+            client: client.clone(),
+            server,
             service_method,
             request,
             reply_channel: tx,
         };
 
-        self.request_bus.send(rpc).map_err(|e| {
+        request_bus.send(rpc).map_err(|e| {
             // The receiving end has been closed. Network connection is broken.
             std::io::Error::new(
                 std::io::ErrorKind::NotConnected,
                 format!(
                     "Cannot send rpc, client {} is disconnected. {}",
-                    self.client.clone(),
-                    e
+                    client, e
                 ),
             )
         })?;
@@ -163,4 +166,31 @@ mod tests {
         }
         Ok(())
     }
+
+    async fn make_rpc(client: Client) -> Result<ReplyMessage> {
+        let request = RequestMessage::from_static(&[0x17, 0x20]);
+        let client = client.clone();
+        client.call_rpc("hello".into(), request).await
+    }
+
+    #[test]
+    fn test_call_across_threads() -> Result<()> {
+        let (tx, rx) = channel();
+        let rpc_future = {
+            let client = Client {
+                client: "C".into(),
+                server: "S".into(),
+                request_bus: tx,
+            };
+            make_rpc(client)
+        };
+        std::thread::spawn(move || {
+            let _ = futures::executor::block_on(rpc_future);
+        });
+        let rpc = rx.recv().expect("The request message should arrive");
+        rpc.reply_channel
+            .send(Ok(Default::default()))
+            .expect("The reply channel should not be closed");
+        Ok(())
+    }
 }

+ 13 - 5
src/network.rs

@@ -497,7 +497,9 @@ mod tests {
 
         // Send first request.
         let reply = futures::executor::block_on(
-            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client
+                .clone()
+                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
         )?;
         assert_eq!(reply_data, reply.as_ref());
         assert_eq!(1, unlock(&network).get_total_rpc_count());
@@ -507,7 +509,9 @@ mod tests {
 
         // Send second request.
         let reply = futures::executor::block_on(
-            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client
+                .clone()
+                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Client is blocked");
         assert_eq!(2, unlock(&network).get_total_rpc_count());
@@ -520,7 +524,9 @@ mod tests {
 
         // Send third request.
         let reply = futures::executor::block_on(
-            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client
+                .clone()
+                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Client is blocked");
         assert_eq!(3, unlock(&network).get_total_rpc_count());
@@ -534,7 +540,9 @@ mod tests {
 
         // Send forth request.
         let reply = futures::executor::block_on(
-            client.call_rpc(JunkRpcs::Echo.name(), request.clone()),
+            client
+                .clone()
+                .call_rpc(JunkRpcs::Echo.name(), request.clone()),
         );
         reply.expect_err("Network is shutdown");
         assert_eq!(3, unlock(&network).get_total_rpc_count());
@@ -565,7 +573,7 @@ mod tests {
 
                 let mut results = vec![];
                 for _ in 0..RPC_COUNT {
-                    let reply = client.call_rpc(
+                    let reply = client.clone().call_rpc(
                         JunkRpcs::Echo.name(),
                         RequestMessage::from_static(&[0x20, 0x17]),
                     );