Преглед изворни кода

Renaming a few structs.

We now have an RPC entity, a request messange and a reply message.
Jing Yang пре 5 година
родитељ
комит
d9e48cb263
2 измењених фајлова са 15 додато и 15 уклоњено
  1. 11 10
      src/lib.rs
  2. 4 5
      src/server.rs

+ 11 - 10
src/lib.rs

@@ -15,15 +15,16 @@ use std::time::{Duration, Instant};
 type Result<T> = std::io::Result<T>;
 
 // Messages passed on network.
-struct RequestMessage {
+struct RpcOnWire {
     client: ClientIdentifier,
     server: ServerIdentifier,
     service_method: String,
-    arg: Bytes,
+    request: RequestMessage,
 
     reply_channel: Sender<Result<ReplyMessage>>,
 }
 
+type RequestMessage = Bytes;
 type ReplyMessage = Bytes;
 
 type ServerIdentifier = String;
@@ -34,7 +35,7 @@ struct Client {
     client: ClientIdentifier,
     server: ServerIdentifier,
 
-    request_bus: Sender<RequestMessage>,
+    request_bus: Sender<RpcOnWire>,
 }
 
 struct Network {
@@ -48,8 +49,8 @@ struct Network {
     servers: HashMap<ServerIdentifier, Arc<Server>>,
 
     // Network bus
-    request_bus: Sender<RequestMessage>,
-    request_pipe: Option<Receiver<RequestMessage>>,
+    request_bus: Sender<RpcOnWire>,
+    request_pipe: Option<Receiver<RpcOnWire>>,
 
     // Closing signal.
     keep_running: bool,
@@ -143,7 +144,7 @@ impl Network {
                 }
 
                 match rx.try_recv() {
-                    Ok(request) => {
+                    Ok(rpc) => {
                         let network_clone = network.clone();
                         thread_pool.spawn_ok(async move {
                             let server_result = {
@@ -152,7 +153,7 @@ impl Network {
                                 );
                                 network.increase_rpc_count();
 
-                                network.dispatch(&request.client)
+                                network.dispatch(&rpc.client)
                             };
 
                             // Cannot use server_result.amp() here, since there
@@ -160,15 +161,15 @@ impl Network {
                             let reply = match server_result {
                                 Ok(server) => {
                                     // Simulates the copy from network to server.
-                                    let data = request.arg.clone();
+                                    let data = rpc.request.clone();
                                     server
-                                        .dispatch(request.service_method, data)
+                                        .dispatch(rpc.service_method, data)
                                         .await
                                 }
                                 Err(e) => Err(e),
                             };
 
-                            if let Err(_e) = request.reply_channel.send(reply) {
+                            if let Err(_e) = rpc.reply_channel.send(reply) {
                                 // TODO(ditsing): log and do nothing.
                             }
                         })

+ 4 - 5
src/server.rs

@@ -1,11 +1,10 @@
-use crate::Result;
-use bytes::Bytes;
+use crate::{ReplyMessage, RequestMessage, Result};
 use std::collections::hash_map::Entry::Vacant;
 use std::sync::Arc;
 
 pub trait RpcHandler {
     // Note this method is not async.
-    fn call(&self, data: Bytes) -> Bytes;
+    fn call(&self, data: RequestMessage) -> ReplyMessage;
 }
 
 struct ServerState {
@@ -30,8 +29,8 @@ impl Server {
     pub async fn dispatch(
         self: Arc<Self>,
         service_method: String,
-        data: Bytes,
-    ) -> Result<Bytes> {
+        data: RequestMessage,
+    ) -> Result<ReplyMessage> {
         let (tx, rx) = futures::channel::oneshot::channel();
         let this = self.clone();
         this.thread_pool.spawn_ok(async move {