Просмотр исходного кода

Add is_running and shutdown test to network.

Jing Yang 5 лет назад
Родитель
Сommit
d3f711ac66
3 измененных файлов с 63 добавлено и 6 удалено
  1. 42 5
      src/network.rs
  2. 1 1
      src/test_utils/junk_server.rs
  3. 20 0
      src/test_utils/mod.rs

+ 42 - 5
src/network.rs

@@ -25,6 +25,8 @@ pub struct Network {
 
     // Closing signal.
     keep_running: bool,
+    // Whether the network is active or not.
+    is_running: bool,
 
     // RPC Counter, using Cell for interior mutability.
     rpc_count: std::cell::Cell<usize>,
@@ -47,11 +49,16 @@ impl Network {
         self.keep_running = false;
     }
 
-    pub fn make_client(
+    pub fn is_running(&self) -> bool {
+        self.is_running
+    }
+
+    pub fn make_client<C: Into<ClientIdentifier>, S: Into<ServerIdentifier>>(
         &mut self,
-        client: ClientIdentifier,
-        server: ServerIdentifier,
+        client: C,
+        server: S,
     ) -> Client {
+        let (client, server) = (client.into(), server.into());
         self.clients.insert(client.clone(), (true, server.clone()));
         Client {
             client,
@@ -217,7 +224,10 @@ impl Network {
 
     pub fn run_daemon() -> Arc<Mutex<Network>> {
         let mut network = Network::new();
-        let rx = network.request_pipe.take().unwrap();
+        let rx = network
+            .request_pipe
+            .take()
+            .expect("Newly created network should have a rx.");
 
         let network = Arc::new(Mutex::new(network));
 
@@ -236,12 +246,14 @@ impl Network {
                 // trying to add / remove RPC servers, or change settings.
                 // Having a shutdown delay helps minimise lock holding.
                 if stop_timer.elapsed() >= Self::SHUTDOWN_DELAY {
-                    let locked_network = network
+                    let mut locked_network = network
                         .lock()
                         .expect("Network mutex should not be poisoned");
                     if !locked_network.keep_running {
+                        locked_network.is_running = false;
                         break;
                     }
+                    locked_network.is_running = true;
                     stop_timer = Instant::now();
                 }
 
@@ -285,6 +297,7 @@ impl Network {
             request_bus: tx,
             request_pipe: Some(rx),
             keep_running: true,
+            is_running: false,
             rpc_count: std::cell::Cell::new(0),
         }
     }
@@ -293,6 +306,7 @@ impl Network {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::test_utils::make_echo_rpc;
 
     fn make_network() -> Network {
         Network::new()
@@ -306,4 +320,27 @@ mod tests {
         network.increase_rpc_count();
         assert_eq!(1, network.get_total_rpc_count());
     }
+
+    #[test]
+    fn test_network_shutdown() {
+        let network = Network::run_daemon();
+        while !network.lock().unwrap().is_running() {
+            std::thread::sleep(Network::SHUTDOWN_DELAY)
+        }
+        let sender = {
+            let mut network = network.lock().unwrap();
+            let sender = network.request_bus.clone();
+            network.keep_running = false;
+            sender
+        };
+        while network.lock().unwrap().is_running() {
+            std::thread::sleep(Network::SHUTDOWN_DELAY)
+        }
+        let (rpc, _) = make_echo_rpc("client", "server");
+        let result = sender.send(rpc);
+        assert!(
+            result.is_err(),
+            "Network is shutdown, requests should not be processed."
+        );
+    }
 }

+ 1 - 1
src/test_utils/junk_server.rs

@@ -1,6 +1,6 @@
 use crate::{server::RpcHandler, ReplyMessage, RequestMessage, Server};
-use std::sync::Arc;
 use bytes::BytesMut;
+use std::sync::Arc;
 
 pub struct EchoRpcHandler {}
 

+ 20 - 0
src/test_utils/mod.rs

@@ -1 +1,21 @@
+use crate::{ReplyMessage, RequestMessage, Result, RpcOnWire};
+use futures::channel::oneshot::Receiver;
+
 pub(crate) mod junk_server;
+
+pub(crate) fn make_echo_rpc<C: Into<String>, S: Into<String>>(
+    client: C,
+    server: S,
+) -> (RpcOnWire, Receiver<Result<ReplyMessage>>) {
+    let (tx, rx) = futures::channel::oneshot::channel();
+    (
+        RpcOnWire {
+            client: client.into(),
+            server: server.into(),
+            service_method: "echo".into(),
+            request: RequestMessage::new(),
+            reply_channel: tx,
+        },
+        rx,
+    )
+}