raft_service.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. use std::future::Future;
  2. use std::net::SocketAddr;
  3. use async_trait::async_trait;
  4. use tarpc::context::Context;
  5. use kvraft::UniqueKVOp;
  6. use ruaft::{
  7. AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
  8. InstallSnapshotReply, Raft, RemoteRaft, RequestVoteArgs, RequestVoteReply,
  9. };
  10. #[tarpc::service]
  11. pub(crate) trait RaftService {
  12. async fn append_entries(
  13. args: AppendEntriesArgs<UniqueKVOp>,
  14. ) -> AppendEntriesReply;
  15. async fn install_snapshot(
  16. args: InstallSnapshotArgs,
  17. ) -> InstallSnapshotReply;
  18. async fn request_vote(args: RequestVoteArgs) -> RequestVoteReply;
  19. }
  20. #[derive(Clone)]
  21. struct RaftRpcServer(Raft<UniqueKVOp>);
  22. #[tarpc::server]
  23. impl RaftService for RaftRpcServer {
  24. async fn append_entries(
  25. self,
  26. _context: Context,
  27. args: AppendEntriesArgs<UniqueKVOp>,
  28. ) -> AppendEntriesReply {
  29. self.0.process_append_entries(args)
  30. }
  31. async fn install_snapshot(
  32. self,
  33. _context: Context,
  34. args: InstallSnapshotArgs,
  35. ) -> InstallSnapshotReply {
  36. self.0.process_install_snapshot(args)
  37. }
  38. async fn request_vote(
  39. self,
  40. _context: Context,
  41. args: RequestVoteArgs,
  42. ) -> RequestVoteReply {
  43. self.0.process_request_vote(args)
  44. }
  45. }
  46. pub(crate) struct LazyRaftServiceClient {
  47. socket_addr: SocketAddr,
  48. once_cell: tokio::sync::OnceCell<RaftServiceClient>,
  49. }
  50. impl LazyRaftServiceClient {
  51. pub(crate) fn new(socket_addr: SocketAddr) -> Self {
  52. Self {
  53. socket_addr,
  54. once_cell: tokio::sync::OnceCell::new(),
  55. }
  56. }
  57. pub(crate) async fn get_or_try_init(
  58. &self,
  59. ) -> std::io::Result<&RaftServiceClient> {
  60. self.once_cell
  61. .get_or_try_init(|| connect_to_raft_service(self.socket_addr))
  62. .await
  63. }
  64. }
  65. #[async_trait]
  66. impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
  67. async fn request_vote(
  68. &self,
  69. args: RequestVoteArgs,
  70. ) -> std::io::Result<RequestVoteReply> {
  71. self.get_or_try_init()
  72. .await?
  73. .request_vote(crate::utils::context(), args)
  74. .await
  75. .map_err(crate::utils::translate_rpc_error)
  76. }
  77. async fn append_entries(
  78. &self,
  79. args: AppendEntriesArgs<UniqueKVOp>,
  80. ) -> std::io::Result<AppendEntriesReply> {
  81. self.get_or_try_init()
  82. .await?
  83. .append_entries(crate::utils::context(), args)
  84. .await
  85. .map_err(crate::utils::translate_rpc_error)
  86. }
  87. async fn install_snapshot(
  88. &self,
  89. args: InstallSnapshotArgs,
  90. ) -> std::io::Result<InstallSnapshotReply> {
  91. self.get_or_try_init()
  92. .await?
  93. .install_snapshot(crate::utils::context(), args)
  94. .await
  95. .map_err(crate::utils::translate_rpc_error)
  96. }
  97. }
  98. pub(crate) async fn connect_to_raft_service(
  99. addr: SocketAddr,
  100. ) -> std::io::Result<RaftServiceClient> {
  101. let conn = tarpc::serde_transport::tcp::connect(
  102. addr,
  103. tokio_serde::formats::Json::default,
  104. )
  105. .await?;
  106. let client =
  107. RaftServiceClient::new(tarpc::client::Config::default(), conn).spawn();
  108. Ok(client)
  109. }
  110. pub(crate) fn start_raft_service_server(
  111. addr: SocketAddr,
  112. raft: Raft<UniqueKVOp>,
  113. ) -> impl Future<Output = std::io::Result<()>> {
  114. let server = RaftRpcServer(raft);
  115. crate::utils::start_tarpc_server(addr, server.serve())
  116. }