|
|
@@ -7,7 +7,7 @@ use crate::Client;
|
|
|
use crate::Result;
|
|
|
use crate::Server;
|
|
|
use crate::{ClientIdentifier, RpcOnWire, ServerIdentifier};
|
|
|
-use rand::{Rng, thread_rng};
|
|
|
+use rand::{thread_rng, Rng};
|
|
|
|
|
|
pub struct Network {
|
|
|
// Settings.
|
|
|
@@ -49,11 +49,15 @@ impl Network {
|
|
|
self.keep_running = false;
|
|
|
}
|
|
|
|
|
|
- pub fn make_connection(&self, client: ClientIdentifier, server: ServerIdentifier) -> Client {
|
|
|
+ pub fn make_connection(
|
|
|
+ &self,
|
|
|
+ client: ClientIdentifier,
|
|
|
+ server: ServerIdentifier,
|
|
|
+ ) -> Client {
|
|
|
Client {
|
|
|
client,
|
|
|
server,
|
|
|
- request_bus: self.request_bus.clone()
|
|
|
+ request_bus: self.request_bus.clone(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -101,9 +105,7 @@ impl Network {
|
|
|
const LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS: u64 = 2000;
|
|
|
|
|
|
async fn delay_for_millis(milli_seconds: u64) {
|
|
|
- tokio_timer::delay_for(
|
|
|
- Duration::from_millis(milli_seconds),
|
|
|
- ).await;
|
|
|
+ tokio_timer::delay_for(Duration::from_millis(milli_seconds)).await;
|
|
|
}
|
|
|
|
|
|
async fn serve_rpc(network: Arc<Mutex<Self>>, rpc: RpcOnWire) {
|
|
|
@@ -133,13 +135,11 @@ impl Network {
|
|
|
// Here we don't reply to client until timeout actually passes.
|
|
|
Self::delay_for_millis(Self::MAX_MINOR_DELAY_MILLIS).await;
|
|
|
|
|
|
- let _ = rpc.reply_channel.send(Err(
|
|
|
- std::io::Error::new(
|
|
|
- std::io::ErrorKind::TimedOut,
|
|
|
- "Remote server did not respond in time.",
|
|
|
- )
|
|
|
- ));
|
|
|
- return
|
|
|
+ let _ = rpc.reply_channel.send(Err(std::io::Error::new(
|
|
|
+ std::io::ErrorKind::TimedOut,
|
|
|
+ "Remote server did not respond in time.",
|
|
|
+ )));
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -158,7 +158,8 @@ impl Network {
|
|
|
Self::MAX_LONG_DELAY_MILLIS
|
|
|
} else {
|
|
|
Self::MAX_SHORT_DELAY_MILLIS
|
|
|
- });
|
|
|
+ },
|
|
|
+ );
|
|
|
Self::delay_for_millis(long_delay).await;
|
|
|
Err(e)
|
|
|
}
|
|
|
@@ -166,26 +167,26 @@ impl Network {
|
|
|
|
|
|
if reply.is_ok() {
|
|
|
// Random drop again.
|
|
|
- if thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1) {
|
|
|
- let _ = rpc.reply_channel.send(Err(
|
|
|
- std::io::Error::new(
|
|
|
- std::io::ErrorKind::TimedOut,
|
|
|
- "The network did not send respond in time.",
|
|
|
- )
|
|
|
- ));
|
|
|
- return
|
|
|
+ if !reliable
|
|
|
+ && thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1)
|
|
|
+ {
|
|
|
+ let _ = rpc.reply_channel.send(Err(std::io::Error::new(
|
|
|
+ std::io::ErrorKind::TimedOut,
|
|
|
+ "The network did not send respond in time.",
|
|
|
+ )));
|
|
|
+ return;
|
|
|
} else if long_reordering {
|
|
|
let should_reorder = thread_rng().gen_ratio(
|
|
|
Self::LONG_REORDERING_RATE.0,
|
|
|
- Self::LONG_REORDERING_RATE.1
|
|
|
+ Self::LONG_REORDERING_RATE.1,
|
|
|
);
|
|
|
if should_reorder {
|
|
|
let long_delay_bound = thread_rng().gen_range(
|
|
|
0,
|
|
|
- Self::LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS
|
|
|
+ Self::LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS,
|
|
|
);
|
|
|
- let long_delay = Self::LONG_REORDERING_BASE_DELAY_MILLIS +
|
|
|
- thread_rng().gen_range(0, 1 + long_delay_bound);
|
|
|
+ let long_delay = Self::LONG_REORDERING_BASE_DELAY_MILLIS
|
|
|
+ + thread_rng().gen_range(0, 1 + long_delay_bound);
|
|
|
Self::delay_for_millis(long_delay).await;
|
|
|
// Falling through to send the result.
|
|
|
}
|