client.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. use std::sync::mpsc::Sender;
  2. use crate::{
  3. ClientIdentifier, ReplyMessage, RequestMessage, Result, RpcOnWire,
  4. ServerIdentifier,
  5. };
  6. // Client interface, used by the RPC client.
  7. pub struct Client {
  8. pub(crate) client: ClientIdentifier,
  9. pub(crate) server: ServerIdentifier,
  10. pub(crate) request_bus: Sender<RpcOnWire>,
  11. }
  12. impl Client {
  13. /// Error type and meaning
  14. /// * Not connected: The client did not have a chance to send the request
  15. /// because the network is down.
  16. /// * Permission denied: The network does not allow the client to send
  17. /// requests.
  18. /// * Broken pipe: The network no longer allows the client to send requests.
  19. /// * Not found: The network could not find the target server.
  20. /// * Invalid input: The server could not find the service / method to call.
  21. /// * Connection reset: The server received the request, but decided to stop
  22. /// responding.
  23. /// * Connection aborted: The client will not receive a reply because the
  24. /// the connection is closed by the network.
  25. pub async fn call_rpc(
  26. &self,
  27. service_method: String,
  28. request: RequestMessage,
  29. ) -> Result<ReplyMessage> {
  30. let (tx, rx) = futures::channel::oneshot::channel();
  31. let rpc = RpcOnWire {
  32. client: self.client.clone(),
  33. server: self.server.clone(),
  34. service_method,
  35. request,
  36. reply_channel: tx,
  37. };
  38. self.request_bus.send(rpc).map_err(|e| {
  39. // The receiving end has been closed. Network connection is broken.
  40. std::io::Error::new(
  41. std::io::ErrorKind::NotConnected,
  42. format!(
  43. "Cannot send rpc, client {} is disconnected. {}",
  44. self.client.clone(),
  45. e
  46. ),
  47. )
  48. })?;
  49. rx.await.map_err(|e| {
  50. std::io::Error::new(
  51. // The network closed our connection. The server might not even
  52. // get a chance to see the request.
  53. std::io::ErrorKind::ConnectionAborted,
  54. format!("Network request is dropped: {}", e),
  55. )
  56. })?
  57. }
  58. }
  59. #[cfg(test)]
  60. mod tests {
  61. use std::sync::mpsc::{channel, Sender};
  62. use super::*;
  63. fn make_rpc_call(tx: Sender<RpcOnWire>) -> Result<ReplyMessage> {
  64. let client = Client {
  65. client: "C".into(),
  66. server: "S".into(),
  67. request_bus: tx,
  68. };
  69. let request = RequestMessage::from_static(&[0x17, 0x20]);
  70. futures::executor::block_on(client.call_rpc("hello".into(), request))
  71. }
  72. fn make_rpc_call_and_reply(
  73. reply: Result<ReplyMessage>,
  74. ) -> Result<ReplyMessage> {
  75. let (tx, rx) = channel();
  76. let handle = std::thread::spawn(move || make_rpc_call(tx));
  77. let rpc = rx.recv().expect("The request message should arrive");
  78. assert_eq!("C", &rpc.client);
  79. assert_eq!("S", &rpc.server);
  80. assert_eq!("hello", &rpc.service_method);
  81. assert_eq!(&[0x17, 0x20], rpc.request.as_ref());
  82. rpc.reply_channel
  83. .send(reply)
  84. .expect("The reply channel should not be closed");
  85. handle.join().expect("Rpc sending thread should succeed")
  86. }
  87. #[test]
  88. fn test_call_rpc() -> Result<()> {
  89. let data = &[0x11, 0x99];
  90. let reply =
  91. make_rpc_call_and_reply(Ok(ReplyMessage::from_static(data)))?;
  92. assert_eq!(data, reply.as_ref());
  93. Ok(())
  94. }
  95. #[test]
  96. fn test_call_rpc_remote_error() -> Result<()> {
  97. let reply = make_rpc_call_and_reply(Err(std::io::Error::new(
  98. std::io::ErrorKind::AddrInUse,
  99. "",
  100. )));
  101. if let Err(e) = reply {
  102. assert_eq!(std::io::ErrorKind::AddrInUse, e.kind());
  103. } else {
  104. panic!("Client should propagate remote error.")
  105. }
  106. Ok(())
  107. }
  108. #[test]
  109. fn test_call_rpc_remote_dropped() -> Result<()> {
  110. let (tx, rx) = channel();
  111. let handle = std::thread::spawn(move || make_rpc_call(tx));
  112. let rpc = rx.recv().expect("The request message should arrive");
  113. drop(rpc.reply_channel);
  114. let reply = handle.join().expect("Rpc sending thread should succeed");
  115. if let Err(e) = reply {
  116. assert_eq!(std::io::ErrorKind::ConnectionAborted, e.kind());
  117. } else {
  118. panic!(
  119. "Client should return error. Reply channel has been dropped."
  120. )
  121. }
  122. Ok(())
  123. }
  124. #[test]
  125. fn test_call_rpc_not_connected() -> Result<()> {
  126. let (tx, rx) = channel();
  127. {
  128. drop(rx);
  129. }
  130. let handle = std::thread::spawn(move || make_rpc_call(tx));
  131. let reply = handle.join().expect("Rpc sending thread should succeed");
  132. if let Err(e) = reply {
  133. assert_eq!(std::io::ErrorKind::NotConnected, e.kind());
  134. } else {
  135. panic!("Client should return error. request_bus has been dropped.")
  136. }
  137. Ok(())
  138. }
  139. }