kv_service.rs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. use std::future::Future;
  2. use std::net::SocketAddr;
  3. use std::sync::Arc;
  4. use async_trait::async_trait;
  5. use tarpc::context::Context;
  6. use kvraft::{
  7. GetArgs, GetReply, KVServer, PutAppendArgs, PutAppendReply, RemoteKvraft,
  8. };
  9. #[tarpc::service]
  10. pub(crate) trait KVService {
  11. async fn get(args: GetArgs) -> GetReply;
  12. async fn put_append(args: PutAppendArgs) -> PutAppendReply;
  13. }
  14. #[derive(Clone)]
  15. struct KVRpcServer(Arc<KVServer>);
  16. #[tarpc::server]
  17. impl KVService for KVRpcServer {
  18. async fn get(self, _context: Context, args: GetArgs) -> GetReply {
  19. self.0.get(args).await
  20. }
  21. async fn put_append(
  22. self,
  23. _context: Context,
  24. args: PutAppendArgs,
  25. ) -> PutAppendReply {
  26. self.0.put_append(args).await
  27. }
  28. }
  29. #[async_trait]
  30. impl RemoteKvraft for KVServiceClient {
  31. async fn get(&self, args: GetArgs) -> std::io::Result<GetReply> {
  32. self.get(Context::current(), args)
  33. .await
  34. .map_err(crate::utils::translate_rpc_error)
  35. }
  36. async fn put_append(
  37. &self,
  38. args: PutAppendArgs,
  39. ) -> std::io::Result<PutAppendReply> {
  40. self.put_append(Context::current(), args)
  41. .await
  42. .map_err(crate::utils::translate_rpc_error)
  43. }
  44. }
  45. #[allow(dead_code)]
  46. pub(crate) async fn connect_to_kv_service(
  47. addr: SocketAddr,
  48. ) -> std::io::Result<KVServiceClient> {
  49. let conn = tarpc::serde_transport::tcp::connect(
  50. addr,
  51. tokio_serde::formats::Json::default,
  52. )
  53. .await?;
  54. let client =
  55. KVServiceClient::new(tarpc::client::Config::default(), conn).spawn();
  56. Ok(client)
  57. }
  58. pub(crate) fn start_kv_service_server(
  59. addr: SocketAddr,
  60. kv_server: Arc<KVServer>,
  61. ) -> impl Future<Output = std::io::Result<()>> {
  62. let server = KVRpcServer(kv_server);
  63. crate::utils::start_tarpc_server(addr, server.serve())
  64. }