mod.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::time::Instant;
  4. pub use anyhow::Result;
  5. use parking_lot::Mutex;
  6. use rand::{thread_rng, Rng};
  7. use tokio::time::Duration;
  8. use ruaft::rpcs::register_server;
  9. use ruaft::utils::DropGuard;
  10. use ruaft::{Raft, RpcClient};
  11. struct ConfigState {
  12. rafts: Vec<Option<Raft>>,
  13. connected: Vec<bool>,
  14. }
  15. struct LogState {
  16. committed_logs: Vec<Vec<i32>>,
  17. results: Vec<Result<()>>,
  18. max_index: usize,
  19. }
  20. pub struct Config {
  21. network: Arc<std::sync::Mutex<labrpc::Network>>,
  22. server_count: usize,
  23. state: Mutex<ConfigState>,
  24. log: Arc<Mutex<LogState>>,
  25. }
  26. impl Config {
  27. fn server_name(i: usize) -> String {
  28. format!("ruaft-server-{}", i)
  29. }
  30. fn client_name(client: usize, server: usize) -> String {
  31. format!("ruaft-client-{}-to-{}", client, server)
  32. }
  33. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  34. eprintln!("{}", msg);
  35. }
  36. pub fn check_one_leader(&self) -> Result<usize> {
  37. for _ in 0..10 {
  38. let millis = 450 + thread_rng().gen_range(0, 100);
  39. sleep_millis(millis);
  40. let mut leaders = HashMap::new();
  41. let state = self.state.lock();
  42. for i in 0..self.server_count {
  43. if state.connected[i] {
  44. if let Some(raft) = &state.rafts[i] {
  45. let (term, is_leader) = raft.get_state();
  46. if is_leader {
  47. leaders.entry(term.0).or_insert(vec![]).push(i)
  48. }
  49. }
  50. }
  51. }
  52. let mut last_term_with_leader = 0;
  53. let mut last_leader = 0;
  54. for (term, leaders) in leaders {
  55. if leaders.len() > 1 {
  56. bail!("term {} has {} (>1) leaders", term, leaders.len());
  57. }
  58. if term > last_term_with_leader {
  59. last_term_with_leader = term;
  60. last_leader = leaders[0];
  61. }
  62. }
  63. if last_term_with_leader != 0 {
  64. return Ok(last_leader);
  65. }
  66. }
  67. Err(anyhow!("expected one leader, got none"))
  68. }
  69. pub fn check_no_leader(&self) -> Result<()> {
  70. let state = self.state.lock();
  71. for i in 0..self.server_count {
  72. if state.connected[i] {
  73. if let Some(raft) = &state.rafts[i] {
  74. if raft.get_state().1 {
  75. bail!(
  76. "expected no leader, but {} claims to be leader",
  77. i
  78. );
  79. }
  80. }
  81. }
  82. }
  83. Ok(())
  84. }
  85. pub fn check_terms(&self) -> Result<()> {
  86. let mut term = None;
  87. let state = self.state.lock();
  88. for i in 0..self.server_count {
  89. if state.connected[i] {
  90. if let Some(raft) = &state.rafts[i] {
  91. let raft_term = raft.get_state().0;
  92. if let Some(term) = term {
  93. if term != raft_term {
  94. bail!("Servers disagree on term")
  95. }
  96. } else {
  97. term.replace(raft_term);
  98. }
  99. }
  100. }
  101. }
  102. Ok(())
  103. }
  104. /// Returns the number of peers that committed at least `index` commands,
  105. /// as well as the command at the index.
  106. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  107. let mut count = 0;
  108. let mut cmd = Self::INVALID_COMMAND;
  109. for i in 0..self.server_count {
  110. let log = self.log.lock();
  111. if let Err(e) = &log.results[i] {
  112. bail!(e.to_string())
  113. }
  114. if log.committed_logs[i].len() > index {
  115. let command = log.committed_logs[i][index];
  116. if count > 0 {
  117. assert_eq!(
  118. command, cmd,
  119. "committed values do not match: index {}, {}, {}",
  120. index, cmd, command
  121. )
  122. }
  123. count += 1;
  124. cmd = command;
  125. }
  126. }
  127. Ok((count, cmd))
  128. }
  129. pub fn one(
  130. &self,
  131. cmd: i32,
  132. expected_servers: usize,
  133. retry: bool,
  134. ) -> Result<usize> {
  135. let start = Instant::now();
  136. let mut cnt = 0;
  137. while start.elapsed() < Duration::from_secs(10) {
  138. let mut first_index = None;
  139. for _ in 0..self.server_count {
  140. cnt += 1;
  141. cnt %= self.server_count;
  142. let state = self.state.lock();
  143. if state.connected[cnt] {
  144. if let Some(raft) = &state.rafts[cnt] {
  145. if let Some((_, index)) =
  146. raft.start(ruaft::Command(cmd))
  147. {
  148. first_index.replace(index);
  149. }
  150. }
  151. }
  152. }
  153. if let Some(index) = first_index {
  154. let agreement_start = Instant::now();
  155. while agreement_start.elapsed() < Duration::from_secs(2) {
  156. let (commit_count, committed_command) =
  157. self.committed_count(index)?;
  158. if commit_count > 0
  159. && commit_count >= expected_servers
  160. && committed_command == cmd
  161. {
  162. return Ok(index);
  163. }
  164. sleep_millis(20);
  165. }
  166. if !retry {
  167. break;
  168. }
  169. } else {
  170. sleep_millis(50);
  171. }
  172. }
  173. Err(anyhow!("one({}) failed to reach agreement", cmd))
  174. }
  175. pub fn connect(&self, index: usize) {
  176. self.set_connect(index, true);
  177. }
  178. pub fn disconnect(&self, index: usize) {
  179. self.set_connect(index, false);
  180. }
  181. pub fn set_connect(&self, index: usize, yes: bool) {
  182. self.state.lock().connected[index] = yes;
  183. let mut network = unlock(&self.network);
  184. // Outgoing clients.
  185. for j in 0..self.server_count {
  186. network.set_enable_client(Self::client_name(index, j), yes)
  187. }
  188. // Incoming clients.
  189. for j in 0..self.server_count {
  190. network.set_enable_client(Self::client_name(j, index), yes);
  191. }
  192. }
  193. pub fn crash1(&mut self, index: usize) {
  194. self.disconnect(index);
  195. unlock(self.network.as_ref()).remove_server(Self::server_name(index));
  196. let raft = {
  197. let mut state = self.state.lock();
  198. state.rafts[index].take()
  199. };
  200. if let Some(raft) = raft {
  201. raft.kill();
  202. }
  203. }
  204. pub fn start1(&mut self, index: usize) -> Result<()> {
  205. if self.state.lock().rafts[index].is_some() {
  206. self.crash1(index);
  207. }
  208. let mut clients = vec![];
  209. {
  210. let mut network = unlock(&self.network);
  211. for j in 0..self.server_count {
  212. clients.push(RpcClient::new(network.make_client(
  213. Self::client_name(index, j),
  214. Self::server_name(j),
  215. )))
  216. }
  217. }
  218. let log_clone = self.log.clone();
  219. let raft = Raft::new(clients, index, move |cmd_index, cmd| {
  220. Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
  221. });
  222. self.state.lock().rafts[index].replace(raft.clone());
  223. let raft = Arc::new(raft);
  224. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  225. Ok(())
  226. }
  227. pub fn leader_start(
  228. &self,
  229. leader: usize,
  230. cmd: i32,
  231. ) -> Option<(usize, usize)> {
  232. self.state.lock().rafts[leader]
  233. .as_ref()
  234. .map(|raft| {
  235. raft.start(ruaft::Command(cmd))
  236. .map(|(term, index)| (term.0, index))
  237. })
  238. .unwrap()
  239. }
  240. pub fn end(&self) {}
  241. pub fn cleanup(&self) {
  242. let mut network = unlock(&self.network);
  243. for i in 0..self.server_count {
  244. network.remove_server(Self::server_name(i));
  245. }
  246. for raft in &mut self.state.lock().rafts {
  247. if let Some(raft) = raft.take() {
  248. raft.kill();
  249. }
  250. }
  251. }
  252. pub fn deferred_cleanup(&self) -> impl Drop + '_ {
  253. DropGuard::new(move || self.cleanup())
  254. }
  255. }
  256. impl Config {
  257. const INVALID_COMMAND: i32 = -1;
  258. fn apply_command(
  259. log_state: Arc<Mutex<LogState>>,
  260. server_index: usize,
  261. index: usize,
  262. command: i32,
  263. ) {
  264. let mut log_state = log_state.lock();
  265. let committed_logs = &mut log_state.committed_logs;
  266. let mut err = None;
  267. for (one_index, one_server) in committed_logs.iter().enumerate() {
  268. if one_server.len() > index && one_server[index] != command {
  269. err = Some((
  270. one_index,
  271. Err(anyhow!(
  272. "commit index ={} server={} {} != server={} {}",
  273. index,
  274. server_index,
  275. command,
  276. one_index,
  277. one_server[index],
  278. )),
  279. ));
  280. break;
  281. }
  282. }
  283. let one_server = &mut committed_logs[server_index];
  284. if one_server.len() <= index {
  285. one_server.resize(index + 1, Self::INVALID_COMMAND);
  286. }
  287. one_server[index] = command;
  288. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  289. log_state.results[server_index] = Err(anyhow!(
  290. "server {} apply out of order {}",
  291. server_index,
  292. index
  293. ));
  294. } else if let Some((one_index, err)) = err {
  295. log_state.results[one_index] = err
  296. }
  297. if index > log_state.max_index {
  298. log_state.max_index = index;
  299. }
  300. }
  301. }
  302. fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
  303. locked.lock().expect("Unlocking network should not fail")
  304. }
  305. pub fn make_config(server_count: usize, unreliable: bool) -> Config {
  306. let network = labrpc::Network::run_daemon();
  307. {
  308. let mut unlocked_network = unlock(&network);
  309. unlocked_network.set_reliable(!unreliable);
  310. unlocked_network.set_long_delays(true);
  311. }
  312. let state = Mutex::new(ConfigState {
  313. rafts: vec![None; server_count],
  314. connected: vec![true; server_count],
  315. });
  316. let log = Arc::new(Mutex::new(LogState {
  317. committed_logs: vec![vec![]; server_count],
  318. results: vec![],
  319. max_index: 0,
  320. }));
  321. log.lock().results.resize_with(server_count, || Ok(()));
  322. let mut cfg = Config {
  323. network,
  324. server_count,
  325. state,
  326. log,
  327. };
  328. for i in 0..server_count {
  329. cfg.start1(i).expect("Starting server should not fail");
  330. }
  331. cfg
  332. }
  333. pub fn sleep_millis(mills: u64) {
  334. std::thread::sleep(std::time::Duration::from_millis(mills))
  335. }
  336. const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  337. pub fn sleep_election_timeouts(count: u64) {
  338. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  339. }