|
|
@@ -82,6 +82,34 @@ impl Network {
|
|
|
pub fn get_total_rpc_count(&self) -> usize {
|
|
|
self.rpc_count.get()
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+impl Network {
|
|
|
+ async fn serve_rpc(network: Arc<Mutex<Self>>, rpc: RpcOnWire) {
|
|
|
+ let server_result = {
|
|
|
+ let network = network
|
|
|
+ .lock()
|
|
|
+ .expect("Network mutex should not be poisoned");
|
|
|
+ network.increase_rpc_count();
|
|
|
+
|
|
|
+ network.dispatch(&rpc.client)
|
|
|
+ };
|
|
|
+
|
|
|
+ // Cannot use server_result.amp() here, since there
|
|
|
+ // is an async request in the middle.
|
|
|
+ let reply = match server_result {
|
|
|
+ Ok(server) => {
|
|
|
+ // Simulates the copy from network to server.
|
|
|
+ let data = rpc.request.clone();
|
|
|
+ server.dispatch(rpc.service_method, data).await
|
|
|
+ }
|
|
|
+ Err(e) => Err(e),
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Err(_e) = rpc.reply_channel.send(reply) {
|
|
|
+ // TODO(ditsing): log and do nothing.
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
pub fn run_daemon() -> Arc<Mutex<Network>> {
|
|
|
let mut network = Network::new();
|
|
|
@@ -115,34 +143,8 @@ impl Network {
|
|
|
|
|
|
match rx.try_recv() {
|
|
|
Ok(rpc) => {
|
|
|
- let network_clone = network.clone();
|
|
|
- thread_pool.spawn_ok(async move {
|
|
|
- let server_result = {
|
|
|
- let network = network_clone.lock().expect(
|
|
|
- "Network mutex should not be poisoned",
|
|
|
- );
|
|
|
- network.increase_rpc_count();
|
|
|
-
|
|
|
- network.dispatch(&rpc.client)
|
|
|
- };
|
|
|
-
|
|
|
- // Cannot use server_result.amp() here, since there
|
|
|
- // is an async request in the middle.
|
|
|
- let reply = match server_result {
|
|
|
- Ok(server) => {
|
|
|
- // Simulates the copy from network to server.
|
|
|
- let data = rpc.request.clone();
|
|
|
- server
|
|
|
- .dispatch(rpc.service_method, data)
|
|
|
- .await
|
|
|
- }
|
|
|
- Err(e) => Err(e),
|
|
|
- };
|
|
|
-
|
|
|
- if let Err(_e) = rpc.reply_channel.send(reply) {
|
|
|
- // TODO(ditsing): log and do nothing.
|
|
|
- }
|
|
|
- })
|
|
|
+ thread_pool
|
|
|
+ .spawn_ok(Self::serve_rpc(network.clone(), rpc));
|
|
|
}
|
|
|
// All senders have disconnected. This should never happen,
|
|
|
// since the network instance itself holds a sender.
|