Преглед изворни кода

Make sure RPCs are cancelled when server is killed.

Jing Yang пре 5 година
родитељ
комит
f5029e1d48
1 измењених фајлова са 57 додато и 1 уклоњено
  1. 57 1
      src/network.rs

+ 57 - 1
src/network.rs

@@ -231,6 +231,15 @@ impl Network {
             }
         }
 
+        // Fail the RPC if the client has been disconnected.
+        let reply = match network.lock().dispatch(&rpc.client) {
+            Ok(_) => reply,
+            Err(_e) => Err(std::io::Error::new(
+                std::io::ErrorKind::ConnectionReset,
+                format!("Network connection has been reset."),
+            )),
+        };
+
         if let Err(_e) = rpc.reply_channel.send(reply) {
             // TODO(ditsing): log and do nothing.
         }
@@ -336,7 +345,7 @@ mod tests {
         },
         make_aborting_rpc, make_echo_rpc,
     };
-    use crate::{ReplyMessage, RequestMessage, Result};
+    use crate::{ReplyMessage, RequestMessage, Result, RpcHandler};
 
     use super::*;
 
@@ -466,6 +475,53 @@ mod tests {
         Ok(())
     }
 
+    pub struct BlockingRpcHandler {
+        start_barrier: Arc<Barrier>,
+        end_barrier: Arc<Barrier>,
+    }
+
+    impl RpcHandler for BlockingRpcHandler {
+        fn call(&self, data: RequestMessage) -> ReplyMessage {
+            self.start_barrier.wait();
+            self.end_barrier.wait();
+            data.into()
+        }
+    }
+
+    #[test]
+    fn test_server_killed() -> Result<()> {
+        let start_barrier = Arc::new(Barrier::new(2));
+        let end_barrier = Arc::new(Barrier::new(2));
+        let network = Network::run_daemon();
+        let mut server = Server::make_server(TEST_SERVER);
+        server.register_rpc_handler(
+            "blocking".to_owned(),
+            Box::new(BlockingRpcHandler {
+                start_barrier: start_barrier.clone(),
+                end_barrier: end_barrier.clone(),
+            }),
+        )?;
+
+        unlock(&network).add_server(TEST_SERVER, server);
+        let client = unlock(&network).make_client(TEST_CLIENT, TEST_SERVER);
+
+        std::thread::spawn(move || {
+            start_barrier.wait();
+            unlock(&network).remove_server(TEST_SERVER);
+            end_barrier.wait();
+        });
+
+        let reply = futures::executor::block_on(
+            client.call_rpc("blocking".to_owned(), Default::default()),
+        );
+
+        let err = reply
+            .expect_err("Client should receive error after server is killed");
+        assert_eq!(std::io::ErrorKind::ConnectionReset, err.kind());
+
+        Ok(())
+    }
+
     fn make_network_and_client() -> (Arc<Mutex<Network>>, Client) {
         let network = Network::run_daemon();