فهرست منبع

Upgrade the shutdown procedure.

The network now shutdown after receiving a default "None" message.
There is not need to deal with shutdown dealines anymore.
Jing Yang 5 سال پیش
والد
کامیت
8b7f0db3ce
2فایلهای تغییر یافته به همراه28 افزوده شده و 16 حذف شده
  1. 15 6
      src/client.rs
  2. 13 10
      src/network.rs

+ 15 - 6
src/client.rs

@@ -12,7 +12,7 @@ pub struct Client {
     pub(crate) client: ClientIdentifier,
     pub(crate) server: ServerIdentifier,
 
-    pub(crate) request_bus: Sender<RpcOnWire>,
+    pub(crate) request_bus: Sender<Option<RpcOnWire>>,
 }
 
 impl Client {
@@ -64,7 +64,7 @@ impl Client {
         };
 
         mark_trace!(local_trace, assemble);
-        self.request_bus.send(rpc).map_err(|e| {
+        self.request_bus.send(Some(rpc)).map_err(|e| {
             // The receiving end has been closed. Network connection is broken.
             std::io::Error::new(
                 std::io::ErrorKind::NotConnected,
@@ -111,7 +111,7 @@ mod tests {
 
     use super::*;
 
-    fn make_rpc_call(tx: Sender<RpcOnWire>) -> Result<ReplyMessage> {
+    fn make_rpc_call(tx: Sender<Option<RpcOnWire>>) -> Result<ReplyMessage> {
         let client = Client {
             client: "C".into(),
             server: "S".into(),
@@ -129,7 +129,10 @@ mod tests {
 
         let handle = std::thread::spawn(move || make_rpc_call(tx));
 
-        let rpc = rx.recv().expect("The request message should arrive");
+        let rpc = rx
+            .recv()
+            .expect("The request message should arrive")
+            .expect("The request message should not be null");
         assert_eq!("C", &rpc.client);
         assert_eq!("S", &rpc.server);
         assert_eq!("hello", &rpc.service_method);
@@ -173,7 +176,10 @@ mod tests {
 
         let handle = std::thread::spawn(move || make_rpc_call(tx));
 
-        let rpc = rx.recv().expect("The request message should arrive");
+        let rpc = rx
+            .recv()
+            .expect("The request message should arrive")
+            .expect("The request message should not be null");
         drop(rpc.reply_channel);
 
         let reply = handle.join().expect("Rpc sending thread should succeed");
@@ -225,7 +231,10 @@ mod tests {
         std::thread::spawn(move || {
             let _ = futures::executor::block_on(rpc_future);
         });
-        let rpc = rx.recv().expect("The request message should arrive");
+        let rpc = rx
+            .recv()
+            .expect("The request message should arrive")
+            .expect("The request message should not be null");
         rpc.reply_channel
             .send(Ok(Default::default()))
             .expect("The reply channel should not be closed");

+ 13 - 10
src/network.rs

@@ -5,7 +5,7 @@ use std::sync::{
 };
 use std::time::{Duration, Instant};
 
-use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
+use crossbeam_channel::{Receiver, RecvError, Sender};
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
 
@@ -24,8 +24,8 @@ pub struct Network {
     servers: HashMap<ServerIdentifier, Arc<Server>>,
 
     // Network bus
-    request_bus: Sender<RpcOnWire>,
-    request_pipe: Option<Receiver<RpcOnWire>>,
+    request_bus: Sender<Option<RpcOnWire>>,
+    request_pipe: Option<Receiver<Option<RpcOnWire>>>,
 
     // Closing signal.
     keep_running: bool,
@@ -51,6 +51,9 @@ impl Network {
 
     pub fn stop(&mut self) {
         self.keep_running = false;
+        self.request_bus
+            .send(None)
+            .expect("Sending RPCs should never fail.");
     }
 
     pub fn stopped(&self) -> bool {
@@ -296,16 +299,16 @@ impl Network {
                     stop_timer = Instant::now();
                 }
 
-                match rx.recv_timeout(Self::SHUTDOWN_DELAY / 2) {
-                    Ok(rpc) => {
+                match rx.recv() {
+                    Ok(Some(rpc)) => {
                         mark_trace!(rpc.trace, dequeue);
                         thread_pool
                             .spawn(Self::serve_rpc(network.clone(), rpc));
                     }
+                    Ok(None) => break,
                     // All senders have disconnected. This should never happen,
                     // since the network instance itself holds a sender.
-                    Err(RecvTimeoutError::Disconnected) => break,
-                    Err(RecvTimeoutError::Timeout) => {}
+                    Err(RecvError) => break,
                 }
             }
 
@@ -387,7 +390,7 @@ mod tests {
         let sender = {
             let mut network = unlock(&network);
 
-            network.keep_running = false;
+            network.stop();
 
             network.request_bus.clone()
         };
@@ -395,7 +398,7 @@ mod tests {
             std::thread::sleep(Network::SHUTDOWN_DELAY)
         }
         let (rpc, _) = make_echo_rpc("client", "server", &[]);
-        let result = sender.send(rpc);
+        let result = sender.send(Some(rpc));
         assert!(
             result.is_err(),
             "Network is shutdown, requests should not be processed."
@@ -421,7 +424,7 @@ mod tests {
             network.request_bus.clone()
         };
 
-        let result = sender.send(rpc);
+        let result = sender.send(Some(rpc));
 
         assert!(
             result.is_ok(),