main.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. use std::net::SocketAddr;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::Arc;
  4. use kvraft::KVServer;
  5. use lazy_static::lazy_static;
  6. use serde_derive::{Deserialize, Serialize};
  7. use warp::Filter;
  8. use crate::one_clerk::create_clerk;
  9. use crate::run::run_kv_instance;
  10. mod kv_service;
  11. mod one_clerk;
  12. mod persister;
  13. mod raft_service;
  14. mod run;
  15. mod utils;
  16. #[derive(Deserialize, Serialize)]
  17. struct PutAppendBody {
  18. key: String,
  19. value: String,
  20. }
  21. const IP_ONE: [u8; 4] = [10, 1, 1, 198];
  22. const IP_TWO: [u8; 4] = [10, 1, 1, 51];
  23. const IP_THREE: [u8; 4] = [10, 1, 1, 56];
  24. lazy_static! {
  25. static ref KV_ADDRS: Vec<SocketAddr> = vec![
  26. (IP_ONE, 9986).into(),
  27. (IP_TWO, 9987).into(),
  28. (IP_THREE, 9988).into(),
  29. ];
  30. static ref RAFT_ADDRS: Vec<SocketAddr> = vec![
  31. (IP_ONE, 10006).into(),
  32. (IP_TWO, 10007).into(),
  33. (IP_THREE, 10008).into(),
  34. ];
  35. static ref WEB_ADDRS: Vec<SocketAddr> = vec![
  36. ([0, 0, 0, 0], 9006).into(),
  37. ([0, 0, 0, 0], 9007).into(),
  38. ([0, 0, 0, 0], 9008).into(),
  39. ];
  40. }
  41. const NOT_READY: &str = "Clerk is not ready";
  42. async fn run_web_server(socket_addr: SocketAddr, kv_server: Arc<KVServer>) {
  43. let is_leader = warp::get()
  44. .and(warp::path!("kvstore" / "is_leader"))
  45. .map(move || format!("{:?}", kv_server.raft().get_state()));
  46. let counter = Arc::new(AtomicUsize::new(0));
  47. let counter_2 = counter.clone();
  48. let counter_3 = counter.clone();
  49. let clerk = create_clerk(KV_ADDRS.clone());
  50. let get_clerk = clerk.clone();
  51. let get = warp::get()
  52. .and(warp::path!("kvstore" / "get" / String))
  53. .map(move |key: String| {
  54. let counter = counter.fetch_add(1, Ordering::SeqCst).to_string();
  55. match get_clerk.get(key.clone()) {
  56. Some(value) => {
  57. key + "!" + counter.as_str() + "!" + value.as_str()
  58. }
  59. None => NOT_READY.to_string(),
  60. }
  61. });
  62. let put_clerk = clerk.clone();
  63. let put = warp::post()
  64. .and(warp::path!("kvstore" / "put"))
  65. .and(warp::body::json())
  66. .map(move |body: PutAppendBody| {
  67. counter_2.fetch_add(1, Ordering::SeqCst);
  68. let code = match put_clerk.put(body.key, body.value) {
  69. None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
  70. Some(_) => warp::http::StatusCode::OK,
  71. };
  72. warp::reply::with_status(warp::reply(), code)
  73. });
  74. let append_clerk = clerk.clone();
  75. let append = warp::post()
  76. .and(warp::path!("kvstore" / "append"))
  77. .and(warp::body::json())
  78. .map(move |body: PutAppendBody| {
  79. counter_3.fetch_add(1, Ordering::SeqCst);
  80. let code = match append_clerk.append(body.key, body.value) {
  81. None => warp::http::StatusCode::SERVICE_UNAVAILABLE,
  82. Some(_) => warp::http::StatusCode::OK,
  83. };
  84. warp::reply::with_status(warp::reply(), code)
  85. });
  86. let routes = is_leader.or(get).or(put).or(append);
  87. warp::serve(routes).run(socket_addr).await;
  88. }
  89. fn main() {
  90. let me: usize = std::env::args()
  91. .nth(1)
  92. .unwrap_or_default()
  93. .parse()
  94. .expect("An index of the current instance must be passed in");
  95. test_utils::init_log(format!("durio-instance-{}", me).as_str())
  96. .expect("Initiating log should not fail");
  97. // Run RPC servers in a thread pool. This pool
  98. // 1. Accepts incoming RPC connections for KV and Raft servers.
  99. // 2. Sends out RPCs to other Raft instances.
  100. // Timers are used by RPC handling code in the KV server.
  101. let local_logger = test_utils::thread_local_logger::get();
  102. let rpc_server_thread_pool = tokio::runtime::Builder::new_multi_thread()
  103. .enable_all()
  104. .thread_name("durio-rpc")
  105. .on_thread_start(move || {
  106. test_utils::thread_local_logger::set(local_logger.clone())
  107. })
  108. .build()
  109. .expect("Creating thread pool should not fail");
  110. let kv_server = rpc_server_thread_pool.block_on(async {
  111. run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me)
  112. .await
  113. .expect("Running kv instance should not fail")
  114. });
  115. // Run web servers in a thread pool. This pool
  116. // 1. Accepts incoming HTTP connections.
  117. // 2. Sends out RPCs to KV instances, both local and remote.
  118. let local_logger = test_utils::thread_local_logger::get();
  119. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  120. .enable_all()
  121. .thread_name("durio-web")
  122. .on_thread_start(move || {
  123. test_utils::thread_local_logger::set(local_logger.clone())
  124. })
  125. .build()
  126. .expect("Creating thread pool should not fail");
  127. thread_pool.block_on(run_web_server(WEB_ADDRS[me], kv_server));
  128. }