mod.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. use parking_lot::Mutex;
  2. use rand::{thread_rng, Rng};
  3. use ruaft::rpcs::register_server;
  4. use ruaft::{Raft, RpcClient};
  5. use std::collections::HashMap;
  6. use std::sync::Arc;
  7. struct ConfigState {
  8. rafts: Vec<Option<Raft>>,
  9. connected: Vec<bool>,
  10. }
  11. pub struct Config {
  12. network: Arc<std::sync::Mutex<labrpc::Network>>,
  13. server_count: usize,
  14. state: Mutex<ConfigState>,
  15. }
  16. pub use anyhow::Result;
  17. impl Config {
  18. fn server_name(i: usize) -> String {
  19. format!("ruaft-server-{}", i)
  20. }
  21. fn client_name(client: usize, server: usize) -> String {
  22. format!("ruaft-client-{}-to-{}", client, server)
  23. }
  24. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  25. eprintln!("{}", msg);
  26. }
  27. pub fn check_one_leader(&self) -> Result<usize> {
  28. for _ in 0..10 {
  29. let millis = 450 + thread_rng().gen_range(0, 100);
  30. sleep_millis(millis);
  31. let mut leaders = HashMap::new();
  32. let state = self.state.lock();
  33. for i in 0..self.server_count {
  34. if state.connected[i] {
  35. if let Some(raft) = &state.rafts[i] {
  36. let (term, is_leader) = raft.get_state();
  37. if is_leader {
  38. leaders.entry(term.0).or_insert(vec![]).push(i)
  39. }
  40. }
  41. }
  42. }
  43. let mut last_term_with_leader = 0;
  44. let mut last_leader = 0;
  45. for (term, leaders) in leaders {
  46. if leaders.len() > 1 {
  47. bail!("term {} has {} (>1) leaders", term, leaders.len());
  48. }
  49. if term > last_term_with_leader {
  50. last_term_with_leader = term;
  51. last_leader = leaders[0];
  52. }
  53. }
  54. if last_term_with_leader != 0 {
  55. return Ok(last_leader);
  56. }
  57. }
  58. Err(anyhow!("expected one leader, got none"))
  59. }
  60. pub fn check_no_leader(&self) -> Result<()> {
  61. let state = self.state.lock();
  62. for i in 0..self.server_count {
  63. if state.connected[i] {
  64. if let Some(raft) = &state.rafts[i] {
  65. if raft.get_state().1 {
  66. bail!(
  67. "expected no leader, but {} claims to be leader",
  68. i
  69. );
  70. }
  71. }
  72. }
  73. }
  74. Ok(())
  75. }
  76. pub fn check_terms(&self) -> Result<()> {
  77. let mut term = None;
  78. let state = self.state.lock();
  79. for i in 0..self.server_count {
  80. if state.connected[i] {
  81. if let Some(raft) = &state.rafts[i] {
  82. let raft_term = raft.get_state().0;
  83. if let Some(term) = term {
  84. if term != raft_term {
  85. bail!("Servers disagree on term")
  86. }
  87. } else {
  88. term.replace(raft_term);
  89. }
  90. }
  91. }
  92. }
  93. Ok(())
  94. }
  95. pub fn connect(&self, index: usize) {
  96. self.set_connect(index, true);
  97. }
  98. pub fn disconnect(&self, index: usize) {
  99. self.set_connect(index, false);
  100. }
  101. pub fn set_connect(&self, index: usize, yes: bool) {
  102. self.state.lock().connected[index] = yes;
  103. let mut network = unlock(&self.network);
  104. // Outgoing clients.
  105. for j in 0..self.server_count {
  106. network.set_enable_client(Self::client_name(index, j), yes)
  107. }
  108. // Incoming clients.
  109. for j in 0..self.server_count {
  110. network.set_enable_client(Self::client_name(j, index), yes);
  111. }
  112. }
  113. pub fn crash1(&mut self, index: usize) {
  114. self.disconnect(index);
  115. unlock(self.network.as_ref()).remove_server(Self::server_name(index));
  116. let raft = {
  117. let mut state = self.state.lock();
  118. state.rafts[index].take()
  119. };
  120. if let Some(raft) = raft {
  121. raft.kill();
  122. }
  123. }
  124. pub fn start1(&mut self, index: usize) -> std::io::Result<()> {
  125. if self.state.lock().rafts[index].is_some() {
  126. self.crash1(index);
  127. }
  128. let mut clients = vec![];
  129. {
  130. let mut network = unlock(&self.network);
  131. for j in 0..self.server_count {
  132. clients.push(RpcClient::new(network.make_client(
  133. Self::client_name(index, j),
  134. Self::server_name(j),
  135. )))
  136. }
  137. }
  138. let raft = Raft::new(clients, index, |_, _| {});
  139. self.state.lock().rafts[index].replace(raft.clone());
  140. let raft = Arc::new(raft);
  141. register_server(raft, Self::server_name(index), self.network.as_ref())
  142. }
  143. pub fn end(&self) {}
  144. pub fn cleanup(&self) {
  145. let mut network = unlock(&self.network);
  146. for i in 0..self.server_count {
  147. network.remove_server(Self::server_name(i));
  148. }
  149. for raft in &mut self.state.lock().rafts {
  150. if let Some(raft) = raft.take() {
  151. raft.kill();
  152. }
  153. }
  154. }
  155. }
  156. fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
  157. locked.lock().expect("Unlocking network should not fail")
  158. }
  159. pub fn make_config(server_count: usize, unreliable: bool) -> Config {
  160. let network = labrpc::Network::run_daemon();
  161. {
  162. let mut unlocked_network = unlock(&network);
  163. unlocked_network.set_reliable(!unreliable);
  164. unlocked_network.set_long_delays(true);
  165. }
  166. let state = Mutex::new(ConfigState {
  167. rafts: vec![None; server_count],
  168. connected: vec![true; server_count],
  169. });
  170. let mut cfg = Config {
  171. network,
  172. server_count,
  173. state,
  174. };
  175. for i in 0..server_count {
  176. cfg.start1(i).expect("Starting server should not fail");
  177. }
  178. cfg
  179. }
  180. pub fn sleep_millis(mills: u64) {
  181. std::thread::sleep(std::time::Duration::from_millis(mills))
  182. }
  183. const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  184. pub fn sleep_election_timeouts(count: u64) {
  185. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  186. }