raft_service.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. use std::future::Future;
  2. use std::net::SocketAddr;
  3. use std::sync::Arc;
  4. use async_trait::async_trait;
  5. use tarpc::context::Context;
  6. use kvraft::UniqueKVOp;
  7. use ruaft::{
  8. AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs,
  9. InstallSnapshotReply, Raft, RemoteRaft, RequestVoteArgs, RequestVoteReply,
  10. };
  11. #[tarpc::service]
  12. pub(crate) trait RaftService {
  13. async fn append_entries(
  14. args: AppendEntriesArgs<UniqueKVOp>,
  15. ) -> AppendEntriesReply;
  16. async fn install_snapshot(
  17. args: InstallSnapshotArgs,
  18. ) -> InstallSnapshotReply;
  19. async fn request_vote(args: RequestVoteArgs) -> RequestVoteReply;
  20. }
  21. #[derive(Clone)]
  22. struct RaftRpcServer(Arc<Raft<UniqueKVOp>>);
  23. #[tarpc::server]
  24. impl RaftService for RaftRpcServer {
  25. async fn append_entries(
  26. self,
  27. _context: Context,
  28. args: AppendEntriesArgs<UniqueKVOp>,
  29. ) -> AppendEntriesReply {
  30. self.0.process_append_entries(args)
  31. }
  32. async fn install_snapshot(
  33. self,
  34. _context: Context,
  35. args: InstallSnapshotArgs,
  36. ) -> InstallSnapshotReply {
  37. self.0.process_install_snapshot(args)
  38. }
  39. async fn request_vote(
  40. self,
  41. _context: Context,
  42. args: RequestVoteArgs,
  43. ) -> RequestVoteReply {
  44. self.0.process_request_vote(args)
  45. }
  46. }
  47. pub(crate) struct LazyRaftServiceClient {
  48. socket_addr: SocketAddr,
  49. once_cell: tokio::sync::OnceCell<RaftServiceClient>,
  50. }
  51. impl LazyRaftServiceClient {
  52. pub(crate) fn new(socket_addr: SocketAddr) -> Self {
  53. Self {
  54. socket_addr,
  55. once_cell: tokio::sync::OnceCell::new(),
  56. }
  57. }
  58. pub(crate) async fn get_or_try_init(
  59. &self,
  60. ) -> std::io::Result<&RaftServiceClient> {
  61. self.once_cell
  62. .get_or_try_init(|| connect_to_raft_service(self.socket_addr))
  63. .await
  64. }
  65. }
  66. #[async_trait]
  67. impl RemoteRaft<UniqueKVOp> for LazyRaftServiceClient {
  68. async fn request_vote(
  69. &self,
  70. args: RequestVoteArgs,
  71. ) -> std::io::Result<RequestVoteReply> {
  72. self.get_or_try_init()
  73. .await?
  74. .request_vote(Context::current(), args)
  75. .await
  76. .map_err(crate::utils::translate_rpc_error)
  77. }
  78. async fn append_entries(
  79. &self,
  80. args: AppendEntriesArgs<UniqueKVOp>,
  81. ) -> std::io::Result<AppendEntriesReply> {
  82. self.get_or_try_init()
  83. .await?
  84. .append_entries(Context::current(), args)
  85. .await
  86. .map_err(crate::utils::translate_rpc_error)
  87. }
  88. async fn install_snapshot(
  89. &self,
  90. args: InstallSnapshotArgs,
  91. ) -> std::io::Result<InstallSnapshotReply> {
  92. self.get_or_try_init()
  93. .await?
  94. .install_snapshot(Context::current(), args)
  95. .await
  96. .map_err(crate::utils::translate_rpc_error)
  97. }
  98. }
  99. pub(crate) async fn connect_to_raft_service(
  100. addr: SocketAddr,
  101. ) -> std::io::Result<RaftServiceClient> {
  102. let conn = tarpc::serde_transport::tcp::connect(
  103. addr,
  104. tokio_serde::formats::Json::default,
  105. )
  106. .await?;
  107. let client =
  108. RaftServiceClient::new(tarpc::client::Config::default(), conn).spawn();
  109. Ok(client)
  110. }
  111. pub(crate) fn start_raft_service_server(
  112. addr: SocketAddr,
  113. raft: Arc<Raft<UniqueKVOp>>,
  114. ) -> impl Future<Output = std::io::Result<()>> {
  115. let server = RaftRpcServer(raft);
  116. crate::utils::start_tarpc_server(addr, server.serve())
  117. }