Bladeren bron

Add tracing to network.

So that we could debug both the network and the RPC servers.
Jing Yang 5 jaren geleden
bovenliggende
commit
7373467214
6 gewijzigde bestanden met toevoegingen van 180 en 2 verwijderingen
  1. 4 0
      Cargo.toml
  2. 45 2
      src/client.rs
  3. 19 0
      src/lib.rs
  4. 33 0
      src/network.rs
  5. 2 0
      src/test_utils/mod.rs
  6. 77 0
      src/tracing.rs

+ 4 - 0
Cargo.toml

@@ -13,3 +13,7 @@ futures = { version = "0.3.5", features = ["thread-pool"] }
 parking_lot = "0.11.0"
 rand = "0.7.3"
 tokio = { version = "0.3", features = ["rt-multi-thread", "time", "parking_lot"] }
+
+[features]
+default = []
+tracing = []

+ 45 - 2
src/client.rs

@@ -1,5 +1,7 @@
 use crossbeam_channel::Sender;
 
+#[cfg(feature = "tracing")]
+use crate::tracing::{Trace, TraceHolder};
 use crate::{
     ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
     ServerIdentifier,
@@ -31,6 +33,25 @@ impl Client {
         service_method: String,
         request: RequestMessage,
     ) -> Result<ReplyMessage> {
+        #[cfg(feature = "tracing")]
+        {
+            let trace = TraceHolder::make();
+            self.trace_and_call_rpc(service_method, request, trace)
+                .await
+        }
+        #[cfg(not(feature = "tracing"))]
+        self.trace_and_call_rpc(service_method, request).await
+    }
+
+    async fn trace_and_call_rpc(
+        &self,
+        service_method: String,
+        request: RequestMessage,
+        #[cfg(feature = "tracing")] trace: TraceHolder,
+    ) -> Result<ReplyMessage> {
+        #[cfg(feature = "tracing")]
+        let local_trace = trace.clone();
+
         let (tx, rx) = futures::channel::oneshot::channel();
         let rpc = RpcOnWire {
             client: self.client.clone(),
@@ -38,8 +59,11 @@ impl Client {
             service_method,
             request,
             reply_channel: tx,
+            #[cfg(feature = "tracing")]
+            trace,
         };
 
+        mark_trace!(local_trace, assemble);
         self.request_bus.send(rpc).map_err(|e| {
             // The receiving end has been closed. Network connection is broken.
             std::io::Error::new(
@@ -50,15 +74,34 @@ impl Client {
                 ),
             )
         })?;
+        mark_trace!(local_trace, enqueue);
 
-        rx.await.map_err(|e| {
+        let ret = rx.await.map_err(|e| {
             std::io::Error::new(
                 // The network closed our connection. The server might not even
                 // get a chance to see the request.
                 std::io::ErrorKind::ConnectionAborted,
                 format!("Network request is dropped: {}", e),
             )
-        })?
+        })?;
+        mark_trace!(local_trace, response);
+        ret
+    }
+
+    #[cfg(feature = "tracing")]
+    pub async fn trace_rpc(
+        &self,
+        service_method: String,
+        request: RequestMessage,
+    ) -> (Result<ReplyMessage>, Trace) {
+        let trace = TraceHolder::make();
+        let local_trace = trace.clone();
+
+        let response = self
+            .trace_and_call_rpc(service_method, request, trace)
+            .await;
+
+        (response, local_trace.extract())
     }
 }
 

+ 19 - 0
src/lib.rs

@@ -4,6 +4,21 @@ extern crate futures;
 extern crate rand;
 extern crate tokio;
 
+#[cfg(feature = "tracing")]
+mod tracing;
+
+#[cfg(feature = "tracing")]
+macro_rules! mark_trace {
+    ($trace:expr, $name:ident) => {
+        $crate::mark!($trace, $name)
+    };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! mark_trace {
+    ($trace:expr, $name:ident) => {};
+}
+
 mod client;
 mod network;
 mod server;
@@ -13,6 +28,8 @@ pub use client::Client;
 pub use network::Network;
 pub use server::RpcHandler;
 pub use server::Server;
+#[cfg(feature = "tracing")]
+pub use tracing::Trace;
 
 // Messages passed on network.
 struct RpcOnWire {
@@ -23,6 +40,8 @@ struct RpcOnWire {
     request: RequestMessage,
 
     reply_channel: futures::channel::oneshot::Sender<Result<ReplyMessage>>,
+    #[cfg(feature = "tracing")]
+    trace: tracing::TraceHolder,
 }
 
 pub type RequestMessage = bytes::Bytes;

+ 33 - 0
src/network.rs

@@ -160,6 +160,7 @@ impl Network {
                 network.long_delays,
             )
         };
+        mark_trace!(rpc.trace, dispatched);
 
         // Random delay before sending requests to server.
         if !reliable {
@@ -177,6 +178,7 @@ impl Network {
                     std::io::ErrorKind::TimedOut,
                     "Remote server did not respond in time.",
                 )));
+                mark_trace!(rpc.trace, served);
                 return;
             }
         }
@@ -189,6 +191,7 @@ impl Network {
                 let data = rpc.request.clone();
                 lookup_result.replace(server.clone());
                 // No need to set timeout. The RPCs are not supposed to block.
+                mark_trace!(rpc.trace, before_serving);
                 server.dispatch(rpc.service_method, data).await
             }
             // If the server does not exist, return error after a random delay.
@@ -205,6 +208,7 @@ impl Network {
                 Err(e)
             }
         };
+        mark_trace!(rpc.trace, after_serving);
 
         if reply.is_ok() {
             // The lookup must have succeeded.
@@ -223,6 +227,7 @@ impl Network {
                     std::io::ErrorKind::ConnectionReset,
                     "Network connection has been reset.".to_owned(),
                 )));
+                mark_trace!(rpc.trace, served);
                 return;
             }
             // Random drop again.
@@ -233,6 +238,7 @@ impl Network {
                     std::io::ErrorKind::TimedOut,
                     "The network did not send respond in time.",
                 )));
+                mark_trace!(rpc.trace, served);
                 return;
             } else if long_reordering {
                 let should_reorder = thread_rng().gen_ratio(
@@ -255,6 +261,7 @@ impl Network {
         if let Err(_e) = rpc.reply_channel.send(reply) {
             // TODO(ditsing): log and do nothing.
         }
+        mark_trace!(rpc.trace, served);
     }
 
     pub fn run_daemon() -> Arc<Mutex<Network>> {
@@ -291,6 +298,7 @@ impl Network {
 
                 match rx.recv_timeout(Self::SHUTDOWN_DELAY / 2) {
                     Ok(rpc) => {
+                        mark_trace!(rpc.trace, dequeue);
                         thread_pool
                             .spawn(Self::serve_rpc(network.clone(), rpc));
                     }
@@ -666,4 +674,29 @@ mod tests {
         }
         eprintln!("Many requests test took {:?}", now.elapsed());
     }
+
+    // Typical request takes about 80ms.
+    #[cfg(feature = "tracing")]
+    #[test]
+    fn test_tracing() -> Result<()> {
+        let (_, client) = make_network_and_client();
+
+        let (response, trace) = futures::executor::block_on(
+            client.trace_rpc(JunkRpcs::Echo.name(), RequestMessage::new()),
+        );
+        assert!(response.is_ok());
+
+        assert!(trace.assemble > Duration::from_secs(0));
+        assert!(trace.enqueue > trace.assemble);
+        // Dequeue can happen before enqueue, as they are on different threads.
+        // assert!(trace.dequeue > trace.enqueue, "{:?}", trace);
+        assert!(trace.dispatched > trace.dequeue);
+        assert!(trace.before_serving > trace.dispatched);
+        assert!(trace.after_serving > trace.before_serving);
+        assert!(trace.served >= trace.after_serving, "{:?}", trace);
+        // Response can be before serve, as they are on different threads.
+        // assert!(trace.response >= trace.served);
+
+        Ok(())
+    }
 }

+ 2 - 0
src/test_utils/mod.rs

@@ -33,6 +33,8 @@ pub(crate) fn make_rpc<C: Into<String>, S: Into<String>>(
             service_method: service_method.name(),
             request: RequestMessage::copy_from_slice(data),
             reply_channel: tx,
+            #[cfg(feature = "tracing")]
+            trace: crate::tracing::TraceHolder::make(),
         },
         rx,
     )

+ 77 - 0
src/tracing.rs

@@ -0,0 +1,77 @@
+#![cfg(feature = "tracing")]
+
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use parking_lot::Mutex;
+
+#[derive(Clone, Debug)]
+pub struct Trace {
+    /// When the trace is created by the client.
+    pub created_at: Instant,
+    /// The delay of when the request is assembled.
+    pub assemble: Duration,
+    /// The delay of when the request is sent to the network.
+    pub enqueue: Duration,
+    /// The delay of when the request is received by the network.
+    pub dequeue: Duration,
+    /// The delay of when the request is sent to the server.
+    pub dispatched: Duration,
+    /// The delay of when the request is about to be processed by the server.
+    pub before_serving: Duration,
+    /// The delay of when the request is served by the server.
+    pub after_serving: Duration,
+    /// The delay of when the network proxies the response from the server.
+    pub served: Duration,
+    /// The delay of when the client receives the response from the network.
+    pub response: Duration,
+}
+
+#[macro_export]
+macro_rules! mark {
+    ($trace:expr, $name:ident) => {{
+        let mut trace = $trace.inner.lock();
+        trace.$name = std::time::Instant::now() - trace.created_at;
+    }};
+}
+
+impl Trace {
+    pub(crate) fn start() -> Trace {
+        Self {
+            created_at: Instant::now(),
+            assemble: Default::default(),
+            enqueue: Default::default(),
+            dequeue: Default::default(),
+            dispatched: Default::default(),
+            before_serving: Default::default(),
+            after_serving: Default::default(),
+            served: Default::default(),
+            response: Default::default(),
+        }
+    }
+}
+
+// Clone is required because the client side code need to mark a trace
+// after sending an RPC.
+#[derive(Clone)]
+pub(crate) struct TraceHolder {
+    pub(crate) inner: Arc<Mutex<Trace>>,
+}
+
+impl TraceHolder {
+    pub(crate) fn make() -> Self {
+        let inner = Trace::start();
+        Self {
+            inner: Arc::new(Mutex::new(inner)),
+        }
+    }
+
+    pub(crate) fn extract(&self) -> Trace {
+        // Another way to do it would be to unwrap the Arc.
+        let mut trace = self.inner.lock().clone();
+        if trace.served.as_nanos() == 0 {
+            trace.served = trace.response
+        }
+        trace
+    }
+}