one_clerk.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. use std::net::SocketAddr;
  2. use std::sync::atomic::{AtomicBool, Ordering};
  3. use std::sync::mpsc::Sender;
  4. use std::sync::Arc;
  5. use crate::kv_service::connect_to_kv_service;
  6. use kvraft::Clerk;
  7. pub(crate) fn create_clerk(socket_addrs: Vec<SocketAddr>) -> OneClerk {
  8. OneClerk::create(socket_addrs)
  9. }
  10. #[derive(Clone)]
  11. pub(crate) struct OneClerk {
  12. ready: Arc<AtomicBool>,
  13. requests: crossbeam_channel::Sender<(ClerkRequest, Sender<String>)>,
  14. }
  15. enum ClerkRequest {
  16. Get(String),
  17. Put(String, String),
  18. Append(String, String),
  19. }
  20. impl OneClerk {
  21. pub(crate) fn create(socket_addrs: Vec<SocketAddr>) -> Self {
  22. let ready = Arc::new(AtomicBool::new(false));
  23. // Create a thread that blocks on all requests to the clerk.
  24. let requests = Self::run_clerk_thread(socket_addrs, ready.clone());
  25. OneClerk { ready, requests }
  26. }
  27. async fn initialize_clerk(socket_addrs: Vec<SocketAddr>) -> Clerk {
  28. log::info!("Starting clerk creation ...");
  29. let mut clients = vec![None; socket_addrs.len()];
  30. while clients.iter().filter(|e| e.is_none()).count() != 0 {
  31. for (index, socket_addr) in socket_addrs.iter().enumerate() {
  32. let result = connect_to_kv_service(socket_addr.clone()).await;
  33. match result {
  34. Ok(client) => clients[index] = Some(client),
  35. Err(e) => log::error!(
  36. "Error connecting to {:?}: {}",
  37. socket_addr,
  38. e
  39. ),
  40. }
  41. }
  42. log::info!("Clerk clients are {:?}", clients);
  43. }
  44. log::info!("Done clerk creation ...");
  45. let clients = clients.into_iter().map(|e| e.unwrap()).collect();
  46. Clerk::new(clients)
  47. }
  48. /// A thread must be created for get requests. We cannot run the blocking
  49. /// Clerk functions on tokio thread pool threads.
  50. fn run_clerk_thread(
  51. socket_addrs: Vec<SocketAddr>,
  52. ready: Arc<AtomicBool>,
  53. ) -> crossbeam_channel::Sender<(ClerkRequest, Sender<String>)> {
  54. let local_logger =
  55. test_utils::thread_local_logger::LocalLogger::inherit();
  56. // Steal a tokio runtime to run the initializer.
  57. let tokio_handle = tokio::runtime::Handle::current();
  58. let (tx, rx) =
  59. crossbeam_channel::unbounded::<(ClerkRequest, Sender<String>)>();
  60. std::thread::spawn(move || {
  61. local_logger.attach();
  62. let mut clerk =
  63. tokio_handle.block_on(Self::initialize_clerk(socket_addrs));
  64. clerk.init_once();
  65. ready.store(true, Ordering::Release);
  66. while let Ok((request, result)) = rx.recv() {
  67. let value = match request {
  68. ClerkRequest::Get(key) => {
  69. clerk.get(key).unwrap_or_default()
  70. }
  71. ClerkRequest::Put(key, value) => {
  72. clerk.put(key, value);
  73. String::default()
  74. }
  75. ClerkRequest::Append(key, value) => {
  76. clerk.append(key, value);
  77. String::default()
  78. }
  79. };
  80. let _ = result.send(value);
  81. }
  82. });
  83. return tx;
  84. }
  85. fn request(&self, request: ClerkRequest) -> Option<String> {
  86. if !self.ready.load(Ordering::Acquire) {
  87. return None;
  88. }
  89. let (result_tx, result_rx) = std::sync::mpsc::channel();
  90. self.requests
  91. .send((request, result_tx))
  92. .expect("Send get request should not fail");
  93. let value = result_rx
  94. .recv()
  95. .expect("Receiving get response should not fail");
  96. Some(value)
  97. }
  98. pub(crate) fn get(&self, key: String) -> Option<String> {
  99. self.request(ClerkRequest::Get(key))
  100. }
  101. pub(crate) fn put(&self, key: String, value: String) -> Option<String> {
  102. self.request(ClerkRequest::Put(key, value))
  103. }
  104. pub(crate) fn append(&self, key: String, value: String) -> Option<String> {
  105. self.request(ClerkRequest::Append(key, value))
  106. }
  107. }