|
@@ -1,6 +1,9 @@
|
|
|
use std::collections::HashMap;
|
|
use std::collections::HashMap;
|
|
|
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
|
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
|
|
-use std::sync::{Arc, Mutex};
|
|
|
|
|
|
|
+use std::sync::{
|
|
|
|
|
+ atomic::{AtomicBool, Ordering},
|
|
|
|
|
+ Arc, Mutex,
|
|
|
|
|
+};
|
|
|
use std::time::{Duration, Instant};
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
|
|
use rand::{thread_rng, Rng};
|
|
use rand::{thread_rng, Rng};
|
|
@@ -26,7 +29,7 @@ pub struct Network {
|
|
|
// Closing signal.
|
|
// Closing signal.
|
|
|
keep_running: bool,
|
|
keep_running: bool,
|
|
|
// Whether the network is active or not.
|
|
// Whether the network is active or not.
|
|
|
- is_running: bool,
|
|
|
|
|
|
|
+ stopped: AtomicBool,
|
|
|
|
|
|
|
|
// RPC Counter, using Cell for interior mutability.
|
|
// RPC Counter, using Cell for interior mutability.
|
|
|
rpc_count: std::cell::Cell<usize>,
|
|
rpc_count: std::cell::Cell<usize>,
|
|
@@ -49,8 +52,8 @@ impl Network {
|
|
|
self.keep_running = false;
|
|
self.keep_running = false;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- pub fn is_running(&self) -> bool {
|
|
|
|
|
- self.is_running
|
|
|
|
|
|
|
+ pub fn stopped(&self) -> bool {
|
|
|
|
|
+ self.stopped.load(Ordering::Acquire)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub fn make_client<C: Into<ClientIdentifier>, S: Into<ServerIdentifier>>(
|
|
pub fn make_client<C: Into<ClientIdentifier>, S: Into<ServerIdentifier>>(
|
|
@@ -249,14 +252,12 @@ impl Network {
|
|
|
// trying to add / remove RPC servers, or change settings.
|
|
// trying to add / remove RPC servers, or change settings.
|
|
|
// Having a shutdown delay helps minimise lock holding.
|
|
// Having a shutdown delay helps minimise lock holding.
|
|
|
if stop_timer.elapsed() >= Self::SHUTDOWN_DELAY {
|
|
if stop_timer.elapsed() >= Self::SHUTDOWN_DELAY {
|
|
|
- let mut locked_network = network
|
|
|
|
|
|
|
+ let locked_network = network
|
|
|
.lock()
|
|
.lock()
|
|
|
.expect("Network mutex should not be poisoned");
|
|
.expect("Network mutex should not be poisoned");
|
|
|
if !locked_network.keep_running {
|
|
if !locked_network.keep_running {
|
|
|
- locked_network.is_running = false;
|
|
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
- locked_network.is_running = true;
|
|
|
|
|
stop_timer = Instant::now();
|
|
stop_timer = Instant::now();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -274,8 +275,18 @@ impl Network {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Shutdown might leak outstanding tasks if timed-out.
|
|
|
|
|
+ thread_pool.shutdown_timeout(Self::SHUTDOWN_DELAY);
|
|
|
|
|
+
|
|
|
// rx is dropped here, all clients should get disconnected error
|
|
// rx is dropped here, all clients should get disconnected error
|
|
|
// and stop sending messages.
|
|
// and stop sending messages.
|
|
|
|
|
+ drop(rx);
|
|
|
|
|
+
|
|
|
|
|
+ network
|
|
|
|
|
+ .lock()
|
|
|
|
|
+ .expect("Network mutex should not be poisoned")
|
|
|
|
|
+ .stopped
|
|
|
|
|
+ .store(true, Ordering::Release);
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
network
|
|
network
|
|
@@ -300,7 +311,7 @@ impl Network {
|
|
|
request_bus: tx,
|
|
request_bus: tx,
|
|
|
request_pipe: Some(rx),
|
|
request_pipe: Some(rx),
|
|
|
keep_running: true,
|
|
keep_running: true,
|
|
|
- is_running: false,
|
|
|
|
|
|
|
+ stopped: Default::default(),
|
|
|
rpc_count: std::cell::Cell::new(0),
|
|
rpc_count: std::cell::Cell::new(0),
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -342,9 +353,6 @@ mod tests {
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_network_shutdown() {
|
|
fn test_network_shutdown() {
|
|
|
let network = Network::run_daemon();
|
|
let network = Network::run_daemon();
|
|
|
- while !unlock(&network).is_running() {
|
|
|
|
|
- std::thread::sleep(Network::SHUTDOWN_DELAY)
|
|
|
|
|
- }
|
|
|
|
|
let sender = {
|
|
let sender = {
|
|
|
let mut network = unlock(&network);
|
|
let mut network = unlock(&network);
|
|
|
|
|
|
|
@@ -352,7 +360,7 @@ mod tests {
|
|
|
|
|
|
|
|
network.request_bus.clone()
|
|
network.request_bus.clone()
|
|
|
};
|
|
};
|
|
|
- while unlock(&network).is_running() {
|
|
|
|
|
|
|
+ while !unlock(&network).stopped() {
|
|
|
std::thread::sleep(Network::SHUTDOWN_DELAY)
|
|
std::thread::sleep(Network::SHUTDOWN_DELAY)
|
|
|
}
|
|
}
|
|
|
let (rpc, _) = make_echo_rpc("client", "server", &[]);
|
|
let (rpc, _) = make_echo_rpc("client", "server", &[]);
|