main.rs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. use std::convert::Infallible;
  2. use std::net::SocketAddr;
  3. use std::sync::atomic::{AtomicUsize, Ordering};
  4. use std::sync::Arc;
  5. use axum::extract::{Json, Path};
  6. use lazy_static::lazy_static;
  7. use serde_derive::{Deserialize, Serialize};
  8. use kvraft::KVServer;
  9. use crate::kv_service::create_async_clerk;
  10. use crate::run::run_kv_instance;
  11. mod kv_service;
  12. mod persister;
  13. mod raft_service;
  14. mod run;
  15. mod utils;
  16. #[derive(Deserialize, Serialize)]
  17. struct PutAppendBody {
  18. key: String,
  19. value: String,
  20. }
  21. const IP_ONE: [u8; 4] = [10, 1, 1, 198];
  22. const IP_TWO: [u8; 4] = [10, 1, 1, 51];
  23. const IP_THREE: [u8; 4] = [10, 1, 1, 56];
  24. lazy_static! {
  25. static ref KV_ADDRS: Vec<SocketAddr> = vec![
  26. (IP_ONE, 9986).into(),
  27. (IP_TWO, 9987).into(),
  28. (IP_THREE, 9988).into(),
  29. ];
  30. static ref RAFT_ADDRS: Vec<SocketAddr> = vec![
  31. (IP_ONE, 10006).into(),
  32. (IP_TWO, 10007).into(),
  33. (IP_THREE, 10008).into(),
  34. ];
  35. static ref WEB_ADDRS: Vec<SocketAddr> = vec![
  36. ([0, 0, 0, 0], 9006).into(),
  37. ([0, 0, 0, 0], 9007).into(),
  38. ([0, 0, 0, 0], 9008).into(),
  39. ];
  40. }
  41. async fn run_web_server(
  42. socket_addr: SocketAddr,
  43. kv_server: Arc<KVServer>,
  44. kv_addrs: Vec<SocketAddr>,
  45. ) {
  46. let app = axum::Router::new();
  47. let app = app.route(
  48. "/kvstore/is_leader",
  49. axum::routing::get(move || {
  50. let kv_server = kv_server.clone();
  51. async move {
  52. let ret: Result<String, Infallible> =
  53. Ok(format!("{:?}", kv_server.raft().get_state()));
  54. ret
  55. }
  56. }),
  57. );
  58. let clerk = Arc::new(create_async_clerk(kv_addrs).await);
  59. let counter = Arc::new(AtomicUsize::new(0));
  60. let get_clerk = clerk.clone();
  61. let app = app.route(
  62. "/kvstore/get/:key",
  63. axum::routing::get(move |Path(key): Path<String>| {
  64. let counter = counter.fetch_add(1, Ordering::Relaxed).to_string();
  65. let get_clerk = get_clerk.clone();
  66. async move {
  67. let value = get_clerk.get(&key).await.unwrap_or_default();
  68. let ret: Result<String, Infallible> =
  69. Ok(key + "!" + counter.as_str() + "!" + value.as_str());
  70. ret
  71. }
  72. }),
  73. );
  74. let put_clerk = clerk.clone();
  75. let app = app.route(
  76. "/kvstore/put",
  77. axum::routing::post(move |Json(body): Json<PutAppendBody>| {
  78. let put_clerk = put_clerk.clone();
  79. async move {
  80. put_clerk.put(body.key, body.value).await;
  81. let ret: Result<String, Infallible> = Ok("OK".to_string());
  82. ret
  83. }
  84. }),
  85. );
  86. let append_clerk = clerk.clone();
  87. let app = app.route(
  88. "/kvstore/append",
  89. axum::routing::post(move |Json(body): Json<PutAppendBody>| {
  90. let append_clerk = append_clerk.clone();
  91. async move {
  92. append_clerk.append(body.key, body.value).await;
  93. let ret: Result<String, Infallible> = Ok("OK".to_string());
  94. ret
  95. }
  96. }),
  97. );
  98. axum::Server::bind(&socket_addr)
  99. .serve(app.into_make_service())
  100. .await
  101. .expect("Webserver should not fail")
  102. }
  103. fn main() {
  104. let me: usize = std::env::args()
  105. .nth(1)
  106. .unwrap_or_default()
  107. .parse()
  108. .expect("An index of the current instance must be passed in");
  109. env_logger::init();
  110. // Run RPC servers in a thread pool. This pool
  111. // 1. Accepts incoming RPC connections for KV and Raft servers.
  112. // 2. Sends out RPCs to other Raft instances.
  113. // Timers are used by RPC handling code in the KV server.
  114. let rpc_server_thread_pool = tokio::runtime::Builder::new_multi_thread()
  115. .enable_all()
  116. .thread_name("durio-rpc")
  117. .build()
  118. .expect("Creating thread pool should not fail");
  119. let kv_server = rpc_server_thread_pool
  120. .block_on(run_kv_instance(KV_ADDRS[me], RAFT_ADDRS.clone(), me))
  121. .expect("Running kv instance should not fail");
  122. // Run web servers in a thread pool. This pool
  123. // 1. Accepts incoming HTTP connections.
  124. // 2. Sends out RPCs to KV instances, both local and remote.
  125. let thread_pool = tokio::runtime::Builder::new_multi_thread()
  126. .enable_all()
  127. .thread_name("durio-web")
  128. .build()
  129. .expect("Creating thread pool should not fail");
  130. thread_pool.block_on(run_web_server(
  131. WEB_ADDRS[me],
  132. kv_server,
  133. KV_ADDRS.clone(),
  134. ));
  135. }
  136. #[cfg(test)]
  137. mod tests {
  138. use super::*;
  139. use std::time::Duration;
  140. // This test is ignored by default, because it cannot be run with other
  141. // Ruaft tests at the same time. All other ruaft tests are compiled with
  142. // feature 'integration-test', which is in conflict with this test. This
  143. // test intends to verify that durio can be run under normal production
  144. // setup, i.e. without 'integration-test'.
  145. #[tokio::test]
  146. async fn smoke_test() {
  147. let kv_addrs: Vec<SocketAddr> = vec![
  148. ([0, 0, 0, 0], 9986).into(),
  149. ([0, 0, 0, 0], 9987).into(),
  150. ([0, 0, 0, 0], 9988).into(),
  151. ];
  152. let raft_addrs: Vec<SocketAddr> = vec![
  153. ([0, 0, 0, 0], 10006).into(),
  154. ([0, 0, 0, 0], 10007).into(),
  155. ([0, 0, 0, 0], 10008).into(),
  156. ];
  157. let web_addrs: Vec<SocketAddr> = vec![
  158. ([0, 0, 0, 0], 9006).into(),
  159. ([0, 0, 0, 0], 9007).into(),
  160. ([0, 0, 0, 0], 9008).into(),
  161. ];
  162. env_logger::init();
  163. // KV servers must be created before the web frontend can be run.
  164. let mut kv_servers = vec![];
  165. for me in 0..3 {
  166. let kv_server =
  167. run_kv_instance(kv_addrs[me], raft_addrs.clone(), me)
  168. .await
  169. .expect("Running kv instance should not fail");
  170. kv_servers.push(kv_server);
  171. }
  172. // All servers at `kv_addrs` must be up.
  173. for (me, kv_server) in kv_servers.into_iter().enumerate() {
  174. tokio::spawn(run_web_server(
  175. web_addrs[me],
  176. kv_server,
  177. kv_addrs.clone(),
  178. ));
  179. }
  180. tokio::time::sleep(Duration::from_millis(500)).await;
  181. let mut leader_count = 0;
  182. for i in 0..3 {
  183. let url = format!(
  184. "http://localhost:{}/kvstore/is_leader",
  185. web_addrs[i].port()
  186. );
  187. let is_leader = reqwest::get(url)
  188. .await
  189. .expect("HTTP request should not fail")
  190. .text()
  191. .await
  192. .expect("Results should be string");
  193. println!("is_leader: {}", is_leader);
  194. if is_leader.contains("true") {
  195. leader_count += 1;
  196. }
  197. }
  198. assert_eq!(1, leader_count);
  199. let body = PutAppendBody {
  200. key: "hello".to_owned(),
  201. value: "world".to_owned(),
  202. };
  203. for i in 0..3 {
  204. let client = reqwest::Client::new();
  205. client
  206. .post(format!(
  207. "http://localhost:{}/kvstore/put",
  208. web_addrs[i].port()
  209. ))
  210. .json(&body)
  211. .send()
  212. .await
  213. .expect("HTTP request should not fail");
  214. }
  215. for i in 0..3 {
  216. let url = format!(
  217. "http://localhost:{}/kvstore/get/{}",
  218. web_addrs[i].port(),
  219. "hello"
  220. );
  221. let result = reqwest::get(url)
  222. .await
  223. .expect("HTTP request should not fail")
  224. .text()
  225. .await
  226. .expect("hi");
  227. assert_eq!("hello!0!world", result);
  228. }
  229. }
  230. }