network.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. use std::collections::HashMap;
  2. use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
  3. use std::sync::{Arc, Mutex};
  4. use std::time::{Duration, Instant};
  5. use rand::{thread_rng, Rng};
  6. use crate::{
  7. Client, ClientIdentifier, Result, RpcOnWire, Server, ServerIdentifier,
  8. };
  9. pub struct Network {
  10. // Settings.
  11. reliable: bool,
  12. long_delays: bool,
  13. long_reordering: bool,
  14. // Clients
  15. clients: HashMap<ClientIdentifier, (bool, ServerIdentifier)>,
  16. servers: HashMap<ServerIdentifier, Arc<Server>>,
  17. // Network bus
  18. request_bus: Sender<RpcOnWire>,
  19. request_pipe: Option<Receiver<RpcOnWire>>,
  20. // Closing signal.
  21. keep_running: bool,
  22. // Whether the network is active or not.
  23. is_running: bool,
  24. // RPC Counter, using Cell for interior mutability.
  25. rpc_count: std::cell::Cell<usize>,
  26. }
  27. impl Network {
  28. pub fn set_reliable(&mut self, yes: bool) {
  29. self.reliable = yes
  30. }
  31. pub fn set_long_reordering(&mut self, yes: bool) {
  32. self.long_reordering = yes
  33. }
  34. pub fn set_long_delays(&mut self, yes: bool) {
  35. self.long_delays = yes
  36. }
  37. pub fn stop(&mut self) {
  38. self.keep_running = false;
  39. }
  40. pub fn is_running(&self) -> bool {
  41. self.is_running
  42. }
  43. pub fn make_client<C: Into<ClientIdentifier>, S: Into<ServerIdentifier>>(
  44. &mut self,
  45. client: C,
  46. server: S,
  47. ) -> Client {
  48. let (client, server) = (client.into(), server.into());
  49. self.clients.insert(client.clone(), (true, server.clone()));
  50. Client {
  51. client,
  52. server,
  53. request_bus: self.request_bus.clone(),
  54. }
  55. }
  56. pub fn set_enable_client(&mut self, client: &ClientIdentifier, yes: bool) {
  57. self.clients.get_mut(client).map(|pair| pair.0 = yes);
  58. }
  59. pub fn add_server(
  60. &mut self,
  61. server_name: ServerIdentifier,
  62. server: Arc<Server>,
  63. ) {
  64. self.servers.insert(server_name, server);
  65. }
  66. pub fn remove_server(&mut self, server_name: &ServerIdentifier) {
  67. self.servers.remove(server_name);
  68. }
  69. fn dispatch(&self, client: &ClientIdentifier) -> Result<Arc<Server>> {
  70. let (enabled, server_name) =
  71. self.clients.get(client).ok_or_else(|| {
  72. std::io::Error::new(
  73. std::io::ErrorKind::NotConnected,
  74. format!("Client {} is not connected.", client),
  75. )
  76. })?;
  77. if !enabled {
  78. return Err(std::io::Error::new(
  79. std::io::ErrorKind::BrokenPipe,
  80. format!("Client {} is disabled.", client),
  81. ));
  82. }
  83. let server = self.servers.get(server_name).ok_or_else(|| {
  84. std::io::Error::new(
  85. std::io::ErrorKind::NotFound,
  86. format!(
  87. "Cannot connect {} to server {}: server not found.",
  88. client, server_name,
  89. ),
  90. )
  91. })?;
  92. Ok(server.clone())
  93. }
  94. pub fn get_total_rpc_count(&self) -> usize {
  95. self.rpc_count.get()
  96. }
  97. }
  98. impl Network {
  99. const MAX_MINOR_DELAY_MILLIS: u64 = 27;
  100. const MAX_SHORT_DELAY_MILLIS: u64 = 100;
  101. const MAX_LONG_DELAY_MILLIS: u64 = 7000;
  102. const DROP_RATE: (u32, u32) = (100, 1000);
  103. const LONG_REORDERING_RATE: (u32, u32) = (600u32, 900u32);
  104. const LONG_REORDERING_BASE_DELAY_MILLIS: u64 = 200;
  105. const LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS: u64 = 2000;
  106. const SHUTDOWN_DELAY: Duration = Duration::from_micros(20);
  107. async fn delay_for_millis(milli_seconds: u64) {
  108. tokio::time::delay_for(Duration::from_millis(milli_seconds)).await;
  109. }
  110. async fn serve_rpc(network: Arc<Mutex<Self>>, rpc: RpcOnWire) {
  111. let (server_result, reliable, long_reordering, long_delays) = {
  112. let network = network
  113. .lock()
  114. .expect("Network mutex should not be poisoned");
  115. network.increase_rpc_count();
  116. (
  117. network.dispatch(&rpc.client),
  118. network.reliable,
  119. network.long_reordering,
  120. network.long_delays,
  121. )
  122. };
  123. // Random delay before sending requests to server.
  124. if !reliable {
  125. let minor_delay =
  126. thread_rng().gen_range(0, Self::MAX_MINOR_DELAY_MILLIS);
  127. Self::delay_for_millis(minor_delay).await;
  128. // Random drop of a DROP_RATE / DROP_BASE chance.
  129. if thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1) {
  130. // Note this is different from the original Go version.
  131. // Here we don't reply to client until timeout actually passes.
  132. Self::delay_for_millis(Self::MAX_MINOR_DELAY_MILLIS).await;
  133. let _ = rpc.reply_channel.send(Err(std::io::Error::new(
  134. std::io::ErrorKind::TimedOut,
  135. "Remote server did not respond in time.",
  136. )));
  137. return;
  138. }
  139. }
  140. let reply = match server_result {
  141. // Call the server.
  142. Ok(server) => {
  143. // Simulates the copy from network to server.
  144. let data = rpc.request.clone();
  145. server.dispatch(rpc.service_method, data).await
  146. }
  147. // If the server does not exist, return error after a random delay.
  148. Err(e) => {
  149. let long_delay = rand::thread_rng().gen_range(
  150. 0,
  151. if long_delays {
  152. Self::MAX_LONG_DELAY_MILLIS
  153. } else {
  154. Self::MAX_SHORT_DELAY_MILLIS
  155. },
  156. );
  157. Self::delay_for_millis(long_delay).await;
  158. Err(e)
  159. }
  160. };
  161. if reply.is_ok() {
  162. // Random drop again.
  163. if !reliable
  164. && thread_rng().gen_ratio(Self::DROP_RATE.0, Self::DROP_RATE.1)
  165. {
  166. let _ = rpc.reply_channel.send(Err(std::io::Error::new(
  167. std::io::ErrorKind::TimedOut,
  168. "The network did not send respond in time.",
  169. )));
  170. return;
  171. } else if long_reordering {
  172. let should_reorder = thread_rng().gen_ratio(
  173. Self::LONG_REORDERING_RATE.0,
  174. Self::LONG_REORDERING_RATE.1,
  175. );
  176. if should_reorder {
  177. let long_delay_bound = thread_rng().gen_range(
  178. 0,
  179. Self::LONG_REORDERING_RANDOM_DELAY_BOUND_MILLIS,
  180. );
  181. let long_delay = Self::LONG_REORDERING_BASE_DELAY_MILLIS
  182. + thread_rng().gen_range(0, 1 + long_delay_bound);
  183. Self::delay_for_millis(long_delay).await;
  184. // Falling through to send the result.
  185. }
  186. }
  187. }
  188. if let Err(_e) = rpc.reply_channel.send(reply) {
  189. // TODO(ditsing): log and do nothing.
  190. }
  191. }
  192. pub fn run_daemon() -> Arc<Mutex<Network>> {
  193. let mut network = Network::new();
  194. let rx = network
  195. .request_pipe
  196. .take()
  197. .expect("Newly created network should have a rx");
  198. let network = Arc::new(Mutex::new(network));
  199. let thread_pool = tokio::runtime::Builder::new()
  200. .threaded_scheduler()
  201. .core_threads(10)
  202. .max_threads(20)
  203. .thread_name("network")
  204. .enable_time()
  205. .build()
  206. .expect("Creating network thread pool should not fail");
  207. let other = network.clone();
  208. std::thread::spawn(move || {
  209. let network = other;
  210. let mut stop_timer = Instant::now();
  211. loop {
  212. // If the lock of network is unfair, we could starve threads
  213. // trying to add / remove RPC servers, or change settings.
  214. // Having a shutdown delay helps minimise lock holding.
  215. if stop_timer.elapsed() >= Self::SHUTDOWN_DELAY {
  216. let mut locked_network = network
  217. .lock()
  218. .expect("Network mutex should not be poisoned");
  219. if !locked_network.keep_running {
  220. locked_network.is_running = false;
  221. break;
  222. }
  223. locked_network.is_running = true;
  224. stop_timer = Instant::now();
  225. }
  226. match rx.try_recv() {
  227. Ok(rpc) => {
  228. thread_pool
  229. .spawn(Self::serve_rpc(network.clone(), rpc));
  230. }
  231. // All senders have disconnected. This should never happen,
  232. // since the network instance itself holds a sender.
  233. Err(TryRecvError::Disconnected) => break,
  234. Err(TryRecvError::Empty) => {
  235. std::thread::sleep(Self::SHUTDOWN_DELAY)
  236. }
  237. }
  238. }
  239. // rx is dropped here, all clients should get disconnected error
  240. // and stop sending messages.
  241. });
  242. network
  243. }
  244. }
  245. impl Network {
  246. fn increase_rpc_count(&self) {
  247. self.rpc_count.set(self.rpc_count.get() + 1);
  248. }
  249. fn new() -> Self {
  250. // The channel has infinite buffer, could OOM the server if there are
  251. // too many pending RPCs to be served.
  252. let (tx, rx) = channel();
  253. Network {
  254. reliable: false,
  255. long_delays: false,
  256. long_reordering: false,
  257. clients: Default::default(),
  258. servers: Default::default(),
  259. request_bus: tx,
  260. request_pipe: Some(rx),
  261. keep_running: true,
  262. is_running: false,
  263. rpc_count: std::cell::Cell::new(0),
  264. }
  265. }
  266. }
  267. #[cfg(test)]
  268. mod tests {
  269. use std::sync::MutexGuard;
  270. use crate::test_utils::{junk_server::make_server, make_echo_rpc};
  271. use crate::Result;
  272. use super::*;
  273. fn make_network() -> Network {
  274. Network::new()
  275. }
  276. #[test]
  277. fn test_rpc_count_works() {
  278. let network = make_network();
  279. assert_eq!(0, network.get_total_rpc_count());
  280. network.increase_rpc_count();
  281. assert_eq!(1, network.get_total_rpc_count());
  282. }
  283. fn unlock<T>(network: &Arc<Mutex<T>>) -> MutexGuard<T> {
  284. network
  285. .lock()
  286. .expect("Network mutex should not be poisoned")
  287. }
  288. #[test]
  289. fn test_network_shutdown() {
  290. let network = Network::run_daemon();
  291. while !unlock(&network).is_running() {
  292. std::thread::sleep(Network::SHUTDOWN_DELAY)
  293. }
  294. let sender = {
  295. let mut network = unlock(&network);
  296. network.keep_running = false;
  297. network.request_bus.clone()
  298. };
  299. while unlock(&network).is_running() {
  300. std::thread::sleep(Network::SHUTDOWN_DELAY)
  301. }
  302. let (rpc, _) = make_echo_rpc("client", "server", &[]);
  303. let result = sender.send(rpc);
  304. assert!(
  305. result.is_err(),
  306. "Network is shutdown, requests should not be processed."
  307. );
  308. }
  309. #[test]
  310. fn test_proxy_rpc() -> Result<()> {
  311. let network = Network::run_daemon();
  312. let sender = {
  313. let mut network = unlock(&network);
  314. network.clients.insert(
  315. "test-client".into(),
  316. (true, "test-server".to_string()),
  317. );
  318. network.servers.insert("test-server".into(), make_server());
  319. network.request_bus.clone()
  320. };
  321. let (rpc, rx) =
  322. make_echo_rpc("test-client", "test-server", &[0x09, 0x00]);
  323. let result = sender.send(rpc);
  324. assert!(
  325. result.is_ok(),
  326. "Network is running, requests should be processed."
  327. );
  328. let reply = match futures::executor::block_on(rx) {
  329. Ok(reply) => reply,
  330. Err(e) => panic!("Future execution should not fail: {}", e),
  331. };
  332. match reply {
  333. Ok(reply) => assert_eq!(reply.as_ref(), &[0x00u8, 0x09u8]),
  334. Err(e) => panic!("Expecting echo message, got {}", e),
  335. }
  336. Ok(())
  337. }
  338. }