Prechádzať zdrojové kódy

Add client interface, and improved error codes.

Jing Yang 5 rokov pred
rodič
commit
f90c7a72c9
4 zmenil súbory, kde vykonal 65 pridanie a 11 odobranie
  1. 54 4
      src/client.rs
  2. 3 3
      src/lib.rs
  3. 7 3
      src/network.rs
  4. 1 1
      src/server.rs

+ 54 - 4
src/client.rs

@@ -1,10 +1,60 @@
-use crate::{ClientIdentifier, RpcOnWire, ServerIdentifier};
+use crate::{
+    ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
+    ServerIdentifier,
+};
 use std::sync::mpsc::Sender;
 
 // Client interface, used by the RPC client.
 pub struct Client {
-    client: ClientIdentifier,
-    server: ServerIdentifier,
+    pub(crate) client: ClientIdentifier,
+    pub(crate) server: ServerIdentifier,
 
-    request_bus: Sender<RpcOnWire>,
+    pub(crate) request_bus: Sender<RpcOnWire>,
+}
+
+impl Client {
+    /// Error type and meaning
+    /// * Connection aborted: The client did not have a chance to send the
+    /// request, or will not receive a reply because the network is down.
+    /// * Not connected: The network does not allow the client to send requests.
+    /// * Broken pipe: The network no longer allows the client to send requests.
+    /// * Not found: The network could not find the target server.
+    /// * Invalid input: The server could not find the service / method to call.
+    /// * Connection reset: The server received the request, but decided to stop
+    /// responding.
+    pub async fn call_rpc(
+        &self,
+        service_method: String,
+        request: RequestMessage,
+    ) -> Result<ReplyMessage> {
+        let (tx, rx) = futures::channel::oneshot::channel();
+        let rpc = RpcOnWire {
+            client: self.client.clone(),
+            server: self.server.clone(),
+            service_method,
+            request,
+            reply_channel: tx,
+        };
+
+        self.request_bus.send(rpc).map_err(|e| {
+            // The receiving end has been closed. Network connection is broken.
+            std::io::Error::new(
+                std::io::ErrorKind::ConnectionAborted,
+                format!(
+                    "Cannot send rpc, client {} is disconnected. {}",
+                    self.client.clone(),
+                    e
+                ),
+            )
+        })?;
+
+        rx.await.map_err(|e| {
+            std::io::Error::new(
+                // The network closed our connection. The server might not even
+                // get a chance to see the request.
+                std::io::ErrorKind::ConnectionAborted,
+                format!("Network request is dropped: {}", e),
+            )
+        })?
+    }
 }

+ 3 - 3
src/lib.rs

@@ -1,5 +1,3 @@
-#![allow(dead_code)]
-
 extern crate bytes;
 extern crate futures;
 
@@ -10,15 +8,17 @@ mod server;
 type Result<T> = std::io::Result<T>;
 pub use client::Client;
 pub use server::Server;
+pub use network::Network;
 
 // Messages passed on network.
 struct RpcOnWire {
     client: ClientIdentifier,
+    #[allow(dead_code)]
     server: ServerIdentifier,
     service_method: String,
     request: RequestMessage,
 
-    reply_channel: std::sync::mpsc::Sender<Result<ReplyMessage>>,
+    reply_channel: futures::channel::oneshot::Sender<Result<ReplyMessage>>,
 }
 
 type RequestMessage = bytes::Bytes;

+ 7 - 3
src/network.rs

@@ -8,7 +8,7 @@ use crate::Result;
 use crate::Server;
 use crate::{ClientIdentifier, RpcOnWire, ServerIdentifier};
 
-struct Network {
+pub struct Network {
     // Settings.
     reliable: bool,
     long_delays: bool,
@@ -48,8 +48,12 @@ impl Network {
         self.keep_running = false;
     }
 
-    pub fn make_connection(_server_name: ServerIdentifier) -> Client {
-        unimplemented!()
+    pub fn make_connection(&self, client: ClientIdentifier, server: ServerIdentifier) -> Client {
+        Client {
+            client,
+            server,
+            request_bus: self.request_bus.clone()
+        }
     }
 
     fn dispatch(&self, client: &ClientIdentifier) -> Result<Arc<Server>> {

+ 1 - 1
src/server.rs

@@ -47,7 +47,7 @@ impl Server {
             let response = match rpc_handler {
                 Some(rpc_handler) => Ok(rpc_handler.call(data)),
                 None => Err(std::io::Error::new(
-                    std::io::ErrorKind::InvalidData,
+                    std::io::ErrorKind::InvalidInput,
                     format!(
                         "Method {} on server {} not found.",
                         service_method, self.name