Kaynağa Gözat

Expose trace externally for better debugability.

Also added more details to tracing.
Jing Yang 4 yıl önce
ebeveyn
işleme
051353fe7a
3 değiştirilmiş dosya ile 41 ekleme ve 4 silme
  1. 12 1
      src/network.rs
  2. 14 2
      src/server.rs
  3. 15 1
      src/tracing.rs

+ 12 - 1
src/network.rs

@@ -195,7 +195,18 @@ impl Network {
                 lookup_result.replace(server.clone());
                 // No need to set timeout. The RPCs are not supposed to block.
                 mark_trace!(rpc.trace, before_serving);
-                server.dispatch(rpc.service_method, data).await
+                #[cfg(not(feature = "tracing"))]
+                {
+                    server.dispatch(rpc.service_method, data).await
+                }
+                #[cfg(feature = "tracing")]
+                {
+                    let res = server
+                        .dispatch(rpc.service_method, data, rpc.trace.clone())
+                        .await;
+                    mark_trace!(rpc.trace, after_server_response);
+                    res
+                }
             }
             // If the server does not exist, return error after a random delay.
             Err(e) => {

+ 14 - 2
src/server.rs

@@ -3,6 +3,8 @@ use std::sync::Arc;
 
 use parking_lot::Mutex;
 
+#[cfg(feature = "tracing")]
+use crate::tracing::TraceHolder;
 use crate::{ReplyMessage, RequestMessage, Result, ServerIdentifier};
 
 pub type RpcHandler = dyn Fn(RequestMessage) -> ReplyMessage;
@@ -30,9 +32,13 @@ impl Server {
         self: Arc<Self>,
         service_method: String,
         data: RequestMessage,
+        #[cfg(feature = "tracing")] trace: TraceHolder,
     ) -> Result<ReplyMessage> {
         let (tx, rx) = futures::channel::oneshot::channel();
         let this = self.clone();
+        mark_trace!(trace, before_server_scheduling);
+        #[cfg(feature = "tracing")]
+        let trace_clone = trace.clone();
         this.thread_pool.spawn_ok(async move {
             let rpc_handler = {
                 // Blocking on a mutex in a thread pool. Sounds horrible, but
@@ -41,6 +47,7 @@ impl Server {
                 state.rpc_count.set(state.rpc_count.get() + 1);
                 state.rpc_handlers.get(&service_method).cloned()
             };
+            mark_trace!(trace_clone, before_handling);
             let response = match rpc_handler {
                 Some(rpc_handler) => Ok(rpc_handler(data)),
                 None => Err(std::io::Error::new(
@@ -51,18 +58,23 @@ impl Server {
                     ),
                 )),
             };
+            mark_trace!(trace_clone, after_handling);
             #[allow(clippy::redundant_pattern_matching)]
             if let Err(_) = tx.send(response) {
                 // Receiving end is dropped. Never mind.
                 // Do nothing.
             }
+            mark_trace!(trace_clone, handler_response);
         });
-        rx.await.map_err(|_e| {
+        mark_trace!(trace, after_server_scheduling);
+        let ret = rx.await.map_err(|_e| {
             std::io::Error::new(
                 std::io::ErrorKind::ConnectionReset,
                 format!("Remote server {} cancelled the RPC.", this.name),
             )
-        })?
+        })?;
+        mark_trace!(trace, server_response);
+        ret
     }
 
     pub fn register_rpc_handler(

+ 15 - 1
src/tracing.rs

@@ -19,6 +19,13 @@ pub struct Trace {
     pub dispatched: Duration,
     /// The delay of when the request is about to be processed by the server.
     pub before_serving: Duration,
+    pub before_server_scheduling: Duration,
+    pub after_server_scheduling: Duration,
+    pub before_handling: Duration,
+    pub after_handling: Duration,
+    pub handler_response: Duration,
+    pub server_response: Duration,
+    pub after_server_response: Duration,
     /// The delay of when the request is served by the server.
     pub after_serving: Duration,
     /// The delay of when the network proxies the response from the server.
@@ -44,6 +51,13 @@ impl Trace {
             dequeue: Default::default(),
             dispatched: Default::default(),
             before_serving: Default::default(),
+            before_server_scheduling: Default::default(),
+            after_server_scheduling: Default::default(),
+            before_handling: Default::default(),
+            after_handling: Default::default(),
+            handler_response: Default::default(),
+            server_response: Default::default(),
+            after_server_response: Default::default(),
             after_serving: Default::default(),
             served: Default::default(),
             response: Default::default(),
@@ -54,7 +68,7 @@ impl Trace {
 // Clone is required because the client side code need to mark a trace
 // after sending an RPC.
 #[derive(Clone)]
-pub(crate) struct TraceHolder {
+pub struct TraceHolder {
     pub(crate) inner: Arc<Mutex<Trace>>,
 }