config.rs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. use std::sync::Arc;
  2. use labrpc::Network;
  3. use parking_lot::Mutex;
  4. use rand::seq::SliceRandom;
  5. use rand::thread_rng;
  6. use kvraft::Clerk;
  7. use kvraft::KVServer;
  8. use crate::{register_kv_server, register_server, Persister, RpcClient};
  9. struct ConfigState {
  10. kv_servers: Vec<Option<Arc<KVServer>>>,
  11. next_clerk: usize,
  12. }
  13. pub struct Config {
  14. network: Arc<Mutex<labrpc::Network>>,
  15. server_count: usize,
  16. state: Mutex<ConfigState>,
  17. storage: Mutex<Vec<Option<Persister>>>,
  18. maxraftstate: usize,
  19. }
  20. impl Config {
  21. fn kv_clerk_name(i: usize, server: usize) -> String {
  22. format!("kvraft-clerk-client-{}-to-{}", i, server)
  23. }
  24. fn kv_server_name(i: usize) -> String {
  25. format!("kv-server-{}", i)
  26. }
  27. fn server_name(i: usize) -> String {
  28. format!("kvraft-server-{}", i)
  29. }
  30. fn client_name(client: usize, server: usize) -> String {
  31. format!("kvraft-client-{}-to-{}", client, server)
  32. }
  33. fn start_server(&self, index: usize) -> std::io::Result<()> {
  34. let mut clients = vec![];
  35. {
  36. let mut network = self.network.lock();
  37. for j in 0..self.server_count {
  38. clients.push(crate::RpcClient::new(network.make_client(
  39. Self::client_name(index, j),
  40. Self::server_name(j),
  41. )))
  42. }
  43. }
  44. let persister = self.storage.lock()[index]
  45. .take()
  46. .expect("A persister must be present to create a raft server");
  47. let kv =
  48. KVServer::new(clients, index, persister, Some(self.maxraftstate));
  49. self.state.lock().kv_servers[index].replace(kv.clone());
  50. let raft = kv.raft().clone();
  51. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  52. register_kv_server(
  53. kv,
  54. Self::kv_server_name(index),
  55. self.network.as_ref(),
  56. )?;
  57. Ok(())
  58. }
  59. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  60. eprintln!("{}", msg);
  61. }
  62. fn shuffled_indexes(&self) -> Vec<usize> {
  63. let mut indexes: Vec<usize> = (0..self.server_count).collect();
  64. indexes.shuffle(&mut thread_rng());
  65. indexes
  66. }
  67. pub fn partition(&self) -> (Vec<usize>, Vec<usize>) {
  68. let state = self.state.lock();
  69. let mut indexes = self.shuffled_indexes();
  70. // Swap leader to position 0.
  71. let leader_position = indexes
  72. .iter()
  73. .position(|index| {
  74. state.kv_servers[*index]
  75. .as_ref()
  76. .map_or(false, |kv| kv.raft().get_state().1)
  77. })
  78. .unwrap_or(0);
  79. indexes.swap(0, leader_position);
  80. let part_one = indexes.split_off(indexes.len() / 2);
  81. let part_two = indexes;
  82. self.network_partition(&part_one, &part_two);
  83. (part_one, part_two)
  84. }
  85. pub fn random_partition(&self) -> (Vec<usize>, Vec<usize>) {
  86. let mut indexes = self.shuffled_indexes();
  87. let part_one = indexes.split_off(indexes.len() / 2);
  88. let part_two = indexes;
  89. self.network_partition(&part_one, &part_two);
  90. (part_one, part_two)
  91. }
  92. fn set_connect(
  93. network: &mut Network,
  94. from: &[usize],
  95. to: &[usize],
  96. yes: bool,
  97. ) {
  98. for i in from {
  99. for j in to {
  100. network.set_enable_client(Self::client_name(*i, *j), yes)
  101. }
  102. }
  103. }
  104. pub fn network_partition(&self, part_one: &[usize], part_two: &[usize]) {
  105. let mut network = self.network.lock();
  106. Self::set_connect(&mut network, part_one, part_two, false);
  107. Self::set_connect(&mut network, part_two, part_one, false);
  108. Self::set_connect(&mut network, part_one, part_one, true);
  109. Self::set_connect(&mut network, part_two, part_two, true);
  110. }
  111. pub fn connect_all(&self) {
  112. let all: Vec<usize> = (0..self.state.lock().kv_servers.len()).collect();
  113. let mut network = self.network.lock();
  114. Self::set_connect(&mut network, &all, &all, true);
  115. }
  116. fn crash_server(&self, index: usize) {
  117. {
  118. let all: Vec<usize> = (0..self.server_count).collect();
  119. let mut network = self.network.lock();
  120. Self::set_connect(&mut network, &all, &[index], false);
  121. Self::set_connect(&mut network, &[index], &all, false);
  122. network.remove_server(Self::server_name(index));
  123. network.remove_server(Self::kv_server_name(index));
  124. }
  125. if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
  126. let persister = kv_server.raft().persister();
  127. let data = Persister::downcast_unsafe(persister.as_ref()).read();
  128. kv_server.kill();
  129. let persister = Persister::new();
  130. persister.restore(data);
  131. self.storage.lock()[index] = Some(persister);
  132. }
  133. }
  134. pub fn crash_all(&self) {
  135. for i in 0..self.server_count {
  136. self.crash_server(i);
  137. }
  138. }
  139. pub fn restart_all(&self) {
  140. for index in 0..self.server_count {
  141. self.start_server(index)
  142. .expect("Start server should never fail");
  143. }
  144. }
  145. fn set_clerk_connect(
  146. network: &mut Network,
  147. clerk_index: usize,
  148. to: &[usize],
  149. yes: bool,
  150. ) {
  151. for j in to {
  152. network.set_enable_client(Self::kv_clerk_name(clerk_index, *j), yes)
  153. }
  154. }
  155. pub fn make_limited_clerk(&self, to: &[usize]) -> Clerk {
  156. let mut clients = vec![];
  157. let clerk_index = {
  158. let mut state = self.state.lock();
  159. state.next_clerk += 1;
  160. state.next_clerk
  161. };
  162. {
  163. let mut network = self.network.lock();
  164. for j in 0..self.server_count {
  165. clients.push(RpcClient::new(network.make_client(
  166. Self::kv_clerk_name(clerk_index, j),
  167. Self::kv_server_name(j),
  168. )));
  169. }
  170. // Disable clerk connection to all kv servers.
  171. Self::set_clerk_connect(
  172. &mut network,
  173. clerk_index,
  174. &(0..self.server_count).collect::<Vec<usize>>(),
  175. false,
  176. );
  177. // Enable clerk connection to some servers.
  178. Self::set_clerk_connect(&mut network, clerk_index, to, true);
  179. }
  180. clients.shuffle(&mut thread_rng());
  181. Clerk::new(clients)
  182. }
  183. pub fn make_clerk(&self) -> Clerk {
  184. self.make_limited_clerk(&(0..self.server_count).collect::<Vec<usize>>())
  185. }
  186. pub fn connect_all_clerks(&self) {
  187. let mut network = self.network.lock();
  188. let all = &(0..self.server_count).collect::<Vec<usize>>();
  189. for clerk_index in 0..self.state.lock().next_clerk {
  190. Self::set_clerk_connect(&mut network, clerk_index + 1, all, true);
  191. }
  192. }
  193. pub fn end(&self) {}
  194. pub fn clean_up(&self) {
  195. let mut network = self.network.lock();
  196. for i in 0..self.server_count {
  197. network.remove_server(Self::server_name(i));
  198. network.remove_server(Self::kv_server_name(i));
  199. }
  200. network.stop();
  201. drop(network);
  202. for kv_server in &mut self.state.lock().kv_servers {
  203. if let Some(kv_server) = kv_server.take() {
  204. kv_server.kill();
  205. }
  206. }
  207. }
  208. }
  209. impl Config {
  210. fn check_size(
  211. &self,
  212. upper: usize,
  213. size_fn: impl Fn(&Persister) -> usize,
  214. ) -> Result<(), String> {
  215. let mut over_limits = String::new();
  216. for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
  217. let p = p
  218. .as_ref()
  219. .expect("KV server must be running to check size")
  220. .raft()
  221. .persister();
  222. let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
  223. if size > upper {
  224. let str = format!(" (index {}, size {})", index, size);
  225. over_limits.push_str(&str);
  226. }
  227. }
  228. if !over_limits.is_empty() {
  229. return Err(format!(
  230. "logs were not trimmed to {}:{}",
  231. upper, over_limits
  232. ));
  233. }
  234. Ok(())
  235. }
  236. pub fn check_log_size(&self, upper: usize) -> Result<(), String> {
  237. self.check_size(upper, ruaft::Persister::state_size)
  238. }
  239. pub fn check_snapshot_size(&self, upper: usize) -> Result<(), String> {
  240. self.check_size(upper, Persister::snapshot_size)
  241. }
  242. }
  243. pub fn make_config(
  244. server_count: usize,
  245. unreliable: bool,
  246. maxraftstate: usize,
  247. ) -> Config {
  248. let network = labrpc::Network::run_daemon();
  249. {
  250. let mut unlocked_network = network.lock();
  251. unlocked_network.set_reliable(!unreliable);
  252. unlocked_network.set_long_delays(true);
  253. }
  254. let state = Mutex::new(ConfigState {
  255. kv_servers: vec![None; server_count],
  256. next_clerk: 0,
  257. });
  258. let storage = Mutex::new(vec![]);
  259. storage
  260. .lock()
  261. .resize_with(server_count, || Some(Persister::new()));
  262. let cfg = Config {
  263. network,
  264. server_count,
  265. state,
  266. storage,
  267. maxraftstate,
  268. };
  269. for i in 0..server_count {
  270. cfg.start_server(i)
  271. .expect("Starting server should not fail");
  272. }
  273. cfg
  274. }