Переглянути джерело

Cosmetic changes: import, rustfmt and messages.

Jing Yang 5 роки тому
батько
коміт
2b8bc5d772
5 змінених файлів з 51 додано та 27 видалено
  1. 2 1
      src/client.rs
  2. 21 11
      src/network.rs
  3. 21 9
      src/server.rs
  4. 5 5
      src/test_utils/junk_server.rs
  5. 2 1
      src/test_utils/mod.rs

+ 2 - 1
src/client.rs

@@ -1,8 +1,9 @@
+use std::sync::mpsc::Sender;
+
 use crate::{
     ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
     ServerIdentifier,
 };
-use std::sync::mpsc::Sender;
 
 // Client interface, used by the RPC client.
 pub struct Client {

+ 21 - 11
src/network.rs

@@ -3,12 +3,12 @@ use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
 
-use crate::Client;
-use crate::Result;
-use crate::Server;
-use crate::{ClientIdentifier, RpcOnWire, ServerIdentifier};
 use rand::{thread_rng, Rng};
 
+use crate::{
+    Client, ClientIdentifier, Result, RpcOnWire, Server, ServerIdentifier,
+};
+
 pub struct Network {
     // Settings.
     reliable: bool,
@@ -227,7 +227,7 @@ impl Network {
         let rx = network
             .request_pipe
             .take()
-            .expect("Newly created network should have a rx.");
+            .expect("Newly created network should have a rx");
 
         let network = Arc::new(Mutex::new(network));
 
@@ -307,9 +307,12 @@ impl Network {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use std::sync::MutexGuard;
+
     use crate::test_utils::make_echo_rpc;
 
+    use super::*;
+
     fn make_network() -> Network {
         Network::new()
     }
@@ -323,19 +326,26 @@ mod tests {
         assert_eq!(1, network.get_total_rpc_count());
     }
 
+    fn unlock<T>(network: &Arc<Mutex<T>>) -> MutexGuard<T> {
+        network
+            .lock()
+            .expect("Network mutex should not be poisoned")
+    }
+
     #[test]
     fn test_network_shutdown() {
         let network = Network::run_daemon();
-        while !network.lock().unwrap().is_running() {
+        while !unlock(&network).is_running() {
             std::thread::sleep(Network::SHUTDOWN_DELAY)
         }
         let sender = {
-            let mut network = network.lock().unwrap();
-            let sender = network.request_bus.clone();
+            let mut network = unlock(&network);
+
             network.keep_running = false;
-            sender
+
+            network.request_bus.clone()
         };
-        while network.lock().unwrap().is_running() {
+        while unlock(&network).is_running() {
             std::thread::sleep(Network::SHUTDOWN_DELAY)
         }
         let (rpc, _) = make_echo_rpc("client", "server");

+ 21 - 9
src/server.rs

@@ -1,7 +1,8 @@
-use crate::{ReplyMessage, RequestMessage, Result};
 use std::collections::hash_map::Entry::Vacant;
 use std::sync::Arc;
 
+use crate::{ReplyMessage, RequestMessage, Result};
+
 pub trait RpcHandler {
     // Note this method is not async.
     fn call(&self, data: RequestMessage) -> ReplyMessage;
@@ -40,7 +41,7 @@ impl Server {
                 let state = self
                     .state
                     .lock()
-                    .expect("The server state mutex should not be poisoned.");
+                    .expect("The server state mutex should not be poisoned");
                 state.rpc_count.set(state.rpc_count.get() + 1);
                 state.rpc_handlers.get(&service_method).map(|r| r.clone())
             };
@@ -75,7 +76,7 @@ impl Server {
         let mut state = self
             .state
             .lock()
-            .expect("The server state mutex should not be poisoned.");
+            .expect("The server state mutex should not be poisoned");
         let debug_service_method = service_method.clone();
         if let Vacant(vacant) = state.rpc_handlers.entry(service_method) {
             vacant.insert(Arc::new(rpc_handler));
@@ -100,7 +101,7 @@ impl Server {
             .name_prefix(name.clone())
             .pool_size(Self::THREAD_POOL_SIZE)
             .create()
-            .expect("Creating thread pools should not fail.");
+            .expect("Creating thread pools should not fail");
         Self {
             name,
             state,
@@ -111,21 +112,32 @@ impl Server {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
     use crate::test_utils::junk_server::{make_server, EchoRpcHandler};
 
+    use super::*;
+
+    fn rpc_handlers_len(server: &Server) -> usize {
+        server
+            .state
+            .lock()
+            .expect("The server state mutex should not be poisoned.")
+            .rpc_handlers
+            .len()
+    }
+
     #[test]
     fn test_register_rpc_handler() -> Result<()> {
         let server = make_server();
 
-        assert_eq!(2, server.state.lock().unwrap().rpc_handlers.len());
+        assert_eq!(2, rpc_handlers_len(server.as_ref()));
         Ok(())
     }
 
     #[test]
     fn test_register_rpc_handler_failure() -> Result<()> {
         let mut server = make_server();
-        let server = std::sync::Arc::get_mut(&mut server).unwrap();
+        let server = std::sync::Arc::get_mut(&mut server)
+            .expect("Server should only be held by the current thread");
 
         let result = server.register_rpc_handler(
             "echo".to_string(),
@@ -133,7 +145,7 @@ mod tests {
         );
 
         assert!(result.is_err());
-        assert_eq!(2, server.state.lock().unwrap().rpc_handlers.len());
+        assert_eq!(2, rpc_handlers_len(server));
         Ok(())
     }
 
@@ -174,7 +186,7 @@ mod tests {
         assert_eq!(
             reply
                 .err()
-                .expect("Aborting RPC should return error.")
+                .expect("Aborting RPC should return error")
                 .kind(),
             std::io::ErrorKind::ConnectionReset,
         );

+ 5 - 5
src/test_utils/junk_server.rs

@@ -1,12 +1,12 @@
-use crate::{server::RpcHandler, ReplyMessage, RequestMessage, Server};
-use bytes::BytesMut;
 use std::sync::Arc;
 
+use crate::{server::RpcHandler, ReplyMessage, RequestMessage, Server};
+
 pub struct EchoRpcHandler {}
 
 impl RpcHandler for EchoRpcHandler {
     fn call(&self, request: RequestMessage) -> ReplyMessage {
-        let mut reply = BytesMut::from(request.as_ref());
+        let mut reply = bytes::BytesMut::from(request.as_ref());
         reply.reverse();
         reply.freeze()
     }
@@ -24,12 +24,12 @@ pub fn make_server() -> Arc<Server> {
     let mut server = Server::make_server("test-server".to_string());
     server
         .register_rpc_handler("echo".to_string(), Box::new(EchoRpcHandler {}))
-        .expect("Registering the first RPC handler should not fail.");
+        .expect("Registering the first RPC handler should not fail");
     server
         .register_rpc_handler(
             "aborting".to_string(),
             Box::new(AbortingRpcHandler {}),
         )
-        .expect("Registering the second RPC handler should not fail.");
+        .expect("Registering the second RPC handler should not fail");
     Arc::new(server)
 }

+ 2 - 1
src/test_utils/mod.rs

@@ -1,6 +1,7 @@
-use crate::{ReplyMessage, RequestMessage, Result, RpcOnWire};
 use futures::channel::oneshot::Receiver;
 
+use crate::{ReplyMessage, RequestMessage, Result, RpcOnWire};
+
 pub(crate) mod junk_server;
 
 pub(crate) fn make_echo_rpc<C: Into<String>, S: Into<String>>(