浏览代码

Implement random delay and random drop.

Jing Yang 5 年之前
父节点
当前提交
3bbc50d620
共有 3 个文件被更改,包括 92 次插入6 次删除
  1. 3 1
      Cargo.toml
  2. 2 0
      src/lib.rs
  3. 87 5
      src/network.rs

+ 3 - 1
Cargo.toml

@@ -6,4 +6,6 @@ edition = "2018"
 
 [dependencies]
 bytes = "0.5.6"
-futures = { version = "0.3.5", features =[ "thread-pool" ] }
+futures = { version = "0.3.5", features =[ "thread-pool" ] }
+rand = "0.7.3"
+tokio-timer = "0.3.0-alpha.6"

+ 2 - 0
src/lib.rs

@@ -1,5 +1,7 @@
 extern crate bytes;
 extern crate futures;
+extern crate rand;
+extern crate tokio_timer;
 
 mod client;
 mod network;

+ 87 - 5
src/network.rs

@@ -7,6 +7,7 @@ use crate::Client;
 use crate::Result;
 use crate::Server;
 use crate::{ClientIdentifier, RpcOnWire, ServerIdentifier};
+use rand::{Rng, thread_rng};
 
 pub struct Network {
     // Settings.
@@ -89,27 +90,108 @@ impl Network {
 }
 
 impl Network {
+    const MAX_MINOR_DELAY_MILLIS: u64 = 27;
+    const MAX_SHORT_DELAY_MILLIS: u64 = 100;
+    const MAX_LONG_DELAY_MILLIS: u64 = 7000;
+
+    const DROP_RATE: (u32, u32) = (100, 1000);
+    const LONG_REORDERING_RATE: (u32, u32) = (600u32, 900u32);
+
+    const LONG_REORDERING_BASE_DELAY_MILLIS: u64 = 200;
+    const LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS: u64 = 2000;
+
+    async fn delay_for_millis(milli_seconds: u64) {
+        tokio_timer::delay_for(
+            Duration::from_millis(milli_seconds),
+        ).await;
+    }
+
     async fn serve_rpc(network: Arc<Mutex<Self>>, rpc: RpcOnWire) {
-        let server_result = {
+        let (server_result, reliable, long_reordering, long_delays) = {
             let network = network
                 .lock()
                 .expect("Network mutex should not be poisoned");
             network.increase_rpc_count();
 
-            network.dispatch(&rpc.client)
+            (
+                network.dispatch(&rpc.client),
+                network.reliable,
+                network.long_reordering,
+                network.long_delays,
+            )
         };
 
-        // Cannot use server_result.amp() here, since there
-        // is an async request in the middle.
+        // Random delay before sending requests to server.
+        if !reliable {
+            let minor_delay =
+                thread_rng().gen_range(0, Self::MAX_MINOR_DELAY_MILLIS);
+            Self::delay_for_millis(minor_delay).await;
+
+            // Random drop of a DROP_RATE / DROP_BASE chance.
+            if thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1) {
+                // Note this is different from the original Go version.
+                // Here we don't reply to client until timeout actually passes.
+                Self::delay_for_millis(Self::MAX_MINOR_DELAY_MILLIS).await;
+
+                let _ = rpc.reply_channel.send(Err(
+                    std::io::Error::new(
+                        std::io::ErrorKind::TimedOut,
+                        "Remote server did not respond in time.",
+                    )
+                ));
+                return
+            }
+        }
+
         let reply = match server_result {
+            // Call the server.
             Ok(server) => {
                 // Simulates the copy from network to server.
                 let data = rpc.request.clone();
                 server.dispatch(rpc.service_method, data).await
             }
-            Err(e) => Err(e),
+            // If the server does not exist, return error after a random delay.
+            Err(e) => {
+                let long_delay = rand::thread_rng().gen_range(
+                    0,
+                    if long_delays {
+                        Self::MAX_LONG_DELAY_MILLIS
+                    } else {
+                        Self::MAX_SHORT_DELAY_MILLIS
+                    });
+                Self::delay_for_millis(long_delay).await;
+                Err(e)
+            }
         };
 
+        if reply.is_ok() {
+            // Random drop again.
+            if thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1) {
+                let _ = rpc.reply_channel.send(Err(
+                    std::io::Error::new(
+                        std::io::ErrorKind::TimedOut,
+                        "The network did not send respond in time.",
+                    )
+                ));
+                return
+            } else if long_reordering {
+                let should_reorder = thread_rng().gen_ratio(
+                    Self::LONG_REORDERING_RATE.0,
+                    Self::LONG_REORDERING_RATE.1
+                );
+                if should_reorder {
+                    let long_delay_bound = thread_rng().gen_range(
+                        0,
+                        Self::LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS
+                    );
+                    let long_delay = Self::LONG_REORDERING_BASE_DELAY_MILLIS +
+                        thread_rng().gen_range(0, 1 + long_delay_bound);
+                    Self::delay_for_millis(long_delay).await;
+                    // Falling through to send the result.
+                }
+            }
+        }
+
         if let Err(_e) = rpc.reply_channel.send(reply) {
             // TODO(ditsing): log and do nothing.
         }