client.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. use crossbeam_channel::Sender;
  2. use crate::{
  3. ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
  4. ServerIdentifier,
  5. };
  6. // Client interface, used by the RPC client.
  7. pub struct Client {
  8. pub(crate) client: ClientIdentifier,
  9. pub(crate) server: ServerIdentifier,
  10. pub(crate) request_bus: Sender<RpcOnWire>,
  11. }
  12. impl Client {
  13. /// Error type and meaning
  14. /// * Not connected: The client did not have a chance to send the request
  15. /// because the network is down.
  16. /// * Permission denied: The network does not allow the client to send
  17. /// requests.
  18. /// * Broken pipe: The network no longer allows the client to send requests.
  19. /// * Not found: The network could not find the target server.
  20. /// * Invalid input: The server could not find the service / method to call.
  21. /// * Connection reset: The server received the request, but decided to stop
  22. /// responding.
  23. /// * Connection aborted: The client will not receive a reply because the
  24. /// the connection is closed by the network.
  25. pub async fn call_rpc(
  26. &self,
  27. service_method: String,
  28. request: RequestMessage,
  29. ) -> Result<ReplyMessage> {
  30. let (tx, rx) = futures::channel::oneshot::channel();
  31. let rpc = RpcOnWire {
  32. client: self.client.clone(),
  33. server: self.server.clone(),
  34. service_method,
  35. request,
  36. reply_channel: tx,
  37. };
  38. self.request_bus.send(rpc).map_err(|e| {
  39. // The receiving end has been closed. Network connection is broken.
  40. std::io::Error::new(
  41. std::io::ErrorKind::NotConnected,
  42. format!(
  43. "Cannot send rpc, client {} is disconnected. {}",
  44. self.client, e
  45. ),
  46. )
  47. })?;
  48. rx.await.map_err(|e| {
  49. std::io::Error::new(
  50. // The network closed our connection. The server might not even
  51. // get a chance to see the request.
  52. std::io::ErrorKind::ConnectionAborted,
  53. format!("Network request is dropped: {}", e),
  54. )
  55. })?
  56. }
  57. }
  58. #[cfg(test)]
  59. mod tests {
  60. use crossbeam_channel::{unbounded, Sender};
  61. use super::*;
  62. fn make_rpc_call(tx: Sender<RpcOnWire>) -> Result<ReplyMessage> {
  63. let client = Client {
  64. client: "C".into(),
  65. server: "S".into(),
  66. request_bus: tx,
  67. };
  68. let request = RequestMessage::from_static(&[0x17, 0x20]);
  69. futures::executor::block_on(client.call_rpc("hello".into(), request))
  70. }
  71. fn make_rpc_call_and_reply(
  72. reply: Result<ReplyMessage>,
  73. ) -> Result<ReplyMessage> {
  74. let (tx, rx) = unbounded();
  75. let handle = std::thread::spawn(move || make_rpc_call(tx));
  76. let rpc = rx.recv().expect("The request message should arrive");
  77. assert_eq!("C", &rpc.client);
  78. assert_eq!("S", &rpc.server);
  79. assert_eq!("hello", &rpc.service_method);
  80. assert_eq!(&[0x17, 0x20], rpc.request.as_ref());
  81. rpc.reply_channel
  82. .send(reply)
  83. .expect("The reply channel should not be closed");
  84. handle.join().expect("Rpc sending thread should succeed")
  85. }
  86. #[test]
  87. fn test_call_rpc() -> Result<()> {
  88. let data = &[0x11, 0x99];
  89. let reply =
  90. make_rpc_call_and_reply(Ok(ReplyMessage::from_static(data)))?;
  91. assert_eq!(data, reply.as_ref());
  92. Ok(())
  93. }
  94. #[test]
  95. fn test_call_rpc_remote_error() -> Result<()> {
  96. let reply = make_rpc_call_and_reply(Err(std::io::Error::new(
  97. std::io::ErrorKind::AddrInUse,
  98. "",
  99. )));
  100. if let Err(e) = reply {
  101. assert_eq!(std::io::ErrorKind::AddrInUse, e.kind());
  102. } else {
  103. panic!("Client should propagate remote error.")
  104. }
  105. Ok(())
  106. }
  107. #[test]
  108. fn test_call_rpc_remote_dropped() -> Result<()> {
  109. let (tx, rx) = unbounded();
  110. let handle = std::thread::spawn(move || make_rpc_call(tx));
  111. let rpc = rx.recv().expect("The request message should arrive");
  112. drop(rpc.reply_channel);
  113. let reply = handle.join().expect("Rpc sending thread should succeed");
  114. if let Err(e) = reply {
  115. assert_eq!(std::io::ErrorKind::ConnectionAborted, e.kind());
  116. } else {
  117. panic!(
  118. "Client should return error. Reply channel has been dropped."
  119. )
  120. }
  121. Ok(())
  122. }
  123. #[test]
  124. fn test_call_rpc_not_connected() -> Result<()> {
  125. let (tx, rx) = unbounded();
  126. {
  127. drop(rx);
  128. }
  129. let handle = std::thread::spawn(move || make_rpc_call(tx));
  130. let reply = handle.join().expect("Rpc sending thread should succeed");
  131. if let Err(e) = reply {
  132. assert_eq!(std::io::ErrorKind::NotConnected, e.kind());
  133. } else {
  134. panic!("Client should return error. request_bus has been dropped.")
  135. }
  136. Ok(())
  137. }
  138. async fn make_rpc(client: Client) -> Result<ReplyMessage> {
  139. let request = RequestMessage::from_static(&[0x17, 0x20]);
  140. client.call_rpc("hello".into(), request).await
  141. }
  142. #[test]
  143. fn test_call_across_threads() -> Result<()> {
  144. let (tx, rx) = unbounded();
  145. let rpc_future = {
  146. let client = Client {
  147. client: "C".into(),
  148. server: "S".into(),
  149. request_bus: tx,
  150. };
  151. make_rpc(client)
  152. };
  153. std::thread::spawn(move || {
  154. let _ = futures::executor::block_on(rpc_future);
  155. });
  156. let rpc = rx.recv().expect("The request message should arrive");
  157. rpc.reply_channel
  158. .send(Ok(Default::default()))
  159. .expect("The reply channel should not be closed");
  160. Ok(())
  161. }
  162. }