mod.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::time::Instant;
  4. pub use anyhow::Result;
  5. use parking_lot::Mutex;
  6. use rand::{thread_rng, Rng};
  7. use tokio::time::Duration;
  8. use ruaft::rpcs::register_server;
  9. use ruaft::utils::DropGuard;
  10. use ruaft::{Persister, Raft, RpcClient};
  11. pub mod persister;
  12. struct ConfigState {
  13. rafts: Vec<Option<Raft>>,
  14. connected: Vec<bool>,
  15. }
  16. struct LogState {
  17. committed_logs: Vec<Vec<i32>>,
  18. results: Vec<Result<()>>,
  19. max_index: usize,
  20. saved: Vec<Arc<persister::Persister>>,
  21. }
  22. pub struct Config {
  23. network: Arc<std::sync::Mutex<labrpc::Network>>,
  24. server_count: usize,
  25. state: Mutex<ConfigState>,
  26. log: Arc<Mutex<LogState>>,
  27. }
  28. impl Config {
  29. fn server_name(i: usize) -> String {
  30. format!("ruaft-server-{}", i)
  31. }
  32. fn client_name(client: usize, server: usize) -> String {
  33. format!("ruaft-client-{}-to-{}", client, server)
  34. }
  35. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  36. eprintln!("{}", msg);
  37. }
  38. pub fn check_one_leader(&self) -> Result<usize> {
  39. for _ in 0..10 {
  40. let millis = 450 + thread_rng().gen_range(0, 100);
  41. sleep_millis(millis);
  42. let mut leaders = HashMap::new();
  43. let state = self.state.lock();
  44. for i in 0..self.server_count {
  45. if state.connected[i] {
  46. if let Some(raft) = &state.rafts[i] {
  47. let (term, is_leader) = raft.get_state();
  48. if is_leader {
  49. leaders.entry(term.0).or_insert(vec![]).push(i)
  50. }
  51. }
  52. }
  53. }
  54. let mut last_term_with_leader = 0;
  55. let mut last_leader = 0;
  56. for (term, leaders) in leaders {
  57. if leaders.len() > 1 {
  58. bail!("term {} has {} (>1) leaders", term, leaders.len());
  59. }
  60. if term > last_term_with_leader {
  61. last_term_with_leader = term;
  62. last_leader = leaders[0];
  63. }
  64. }
  65. if last_term_with_leader != 0 {
  66. return Ok(last_leader);
  67. }
  68. }
  69. Err(anyhow!("expected one leader, got none"))
  70. }
  71. pub fn check_no_leader(&self) -> Result<()> {
  72. let state = self.state.lock();
  73. for i in 0..self.server_count {
  74. if state.connected[i] {
  75. if let Some(raft) = &state.rafts[i] {
  76. if raft.get_state().1 {
  77. bail!(
  78. "expected no leader, but {} claims to be leader",
  79. i
  80. );
  81. }
  82. }
  83. }
  84. }
  85. Ok(())
  86. }
  87. pub fn check_terms(&self) -> Result<Option<usize>> {
  88. let mut term = None;
  89. let state = self.state.lock();
  90. for i in 0..self.server_count {
  91. if state.connected[i] {
  92. if let Some(raft) = &state.rafts[i] {
  93. let raft_term = raft.get_state().0;
  94. if let Some(term) = term {
  95. if term != raft_term {
  96. bail!("Servers disagree on term")
  97. }
  98. } else {
  99. term.replace(raft_term);
  100. }
  101. }
  102. }
  103. }
  104. // Unwrap type Term into usize.
  105. Ok(term.map(|term| term.0))
  106. }
  107. /// Returns the number of peers that committed at least `index` commands,
  108. /// as well as the command at the index.
  109. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  110. let mut count = 0;
  111. let mut cmd = Self::INVALID_COMMAND;
  112. for i in 0..self.server_count {
  113. let log = self.log.lock();
  114. if let Err(e) = &log.results[i] {
  115. bail!(e.to_string())
  116. }
  117. if log.committed_logs[i].len() > index {
  118. let command = log.committed_logs[i][index];
  119. if count > 0 && command != cmd {
  120. bail!(
  121. "committed values do not match: index {}, {}, {}",
  122. index,
  123. cmd,
  124. command
  125. )
  126. }
  127. count += 1;
  128. cmd = command;
  129. }
  130. }
  131. Ok((count, cmd))
  132. }
  133. pub fn wait(
  134. &self,
  135. index: usize,
  136. min_count: usize,
  137. at_term: Option<usize>,
  138. ) -> Result<Option<i32>> {
  139. let mut sleep_time_mills = 10;
  140. for _ in 0..30 {
  141. let (count, _) = self.committed_count(index)?;
  142. if count >= min_count {
  143. break;
  144. }
  145. sleep_millis(sleep_time_mills);
  146. if sleep_time_mills < 1000 {
  147. sleep_time_mills <<= 1;
  148. }
  149. if let Some(at_term) = at_term {
  150. let state = self.state.lock();
  151. for raft in &state.rafts {
  152. if let Some(raft) = raft {
  153. let (term, _) = raft.get_state();
  154. if term.0 > at_term {
  155. return Ok(None);
  156. }
  157. }
  158. }
  159. }
  160. }
  161. let (count, cmd) = self.committed_count(index)?;
  162. if count < min_count {
  163. bail!(
  164. "only {} decided for index {}; wanted {}",
  165. count,
  166. index,
  167. min_count
  168. )
  169. }
  170. Ok(Some(cmd))
  171. }
  172. pub fn one(
  173. &self,
  174. cmd: i32,
  175. expected_servers: usize,
  176. retry: bool,
  177. ) -> Result<usize> {
  178. let start = Instant::now();
  179. let mut cnt = 0;
  180. while start.elapsed() < Duration::from_secs(10) {
  181. let mut first_index = None;
  182. for _ in 0..self.server_count {
  183. cnt += 1;
  184. cnt %= self.server_count;
  185. let state = self.state.lock();
  186. if state.connected[cnt] {
  187. if let Some(raft) = &state.rafts[cnt] {
  188. if let Some((_, index)) =
  189. raft.start(ruaft::Command(cmd))
  190. {
  191. first_index.replace(index);
  192. }
  193. }
  194. }
  195. }
  196. if let Some(index) = first_index {
  197. let agreement_start = Instant::now();
  198. while agreement_start.elapsed() < Duration::from_secs(2) {
  199. let (commit_count, committed_command) =
  200. self.committed_count(index)?;
  201. if commit_count > 0
  202. && commit_count >= expected_servers
  203. && committed_command == cmd
  204. {
  205. return Ok(index);
  206. }
  207. sleep_millis(20);
  208. }
  209. if !retry {
  210. break;
  211. }
  212. } else {
  213. sleep_millis(50);
  214. }
  215. }
  216. Err(anyhow!("one({}) failed to reach agreement", cmd))
  217. }
  218. pub fn connect(&self, index: usize) {
  219. self.set_connect(index, true);
  220. }
  221. pub fn disconnect(&self, index: usize) {
  222. self.set_connect(index, false);
  223. }
  224. pub fn set_connect(&self, index: usize, yes: bool) {
  225. let mut state = self.state.lock();
  226. state.connected[index] = yes;
  227. let mut network = unlock(&self.network);
  228. // Outgoing clients.
  229. for j in 0..self.server_count {
  230. if state.connected[j] {
  231. network.set_enable_client(Self::client_name(index, j), yes)
  232. }
  233. }
  234. // Incoming clients.
  235. for j in 0..self.server_count {
  236. if state.connected[j] {
  237. network.set_enable_client(Self::client_name(j, index), yes);
  238. }
  239. }
  240. }
  241. pub fn crash1(&self, index: usize) {
  242. self.disconnect(index);
  243. unlock(self.network.as_ref()).remove_server(Self::server_name(index));
  244. let raft = self.state.lock().rafts[index].take();
  245. let data = self.log.lock().saved[index].read_state();
  246. // Make sure to give up the log lock before calling external code, which
  247. // might directly or indirectly block on the log lock, e.g. through
  248. // the apply command function.
  249. if let Some(raft) = raft {
  250. raft.kill();
  251. }
  252. let mut log = self.log.lock();
  253. log.saved[index] = Arc::new(persister::Persister::new());
  254. log.saved[index].save_state(data);
  255. }
  256. pub fn start1(&self, index: usize) -> Result<()> {
  257. if self.state.lock().rafts[index].is_some() {
  258. self.crash1(index);
  259. }
  260. let mut clients = vec![];
  261. {
  262. let mut network = unlock(&self.network);
  263. for j in 0..self.server_count {
  264. clients.push(RpcClient::new(network.make_client(
  265. Self::client_name(index, j),
  266. Self::server_name(j),
  267. )))
  268. }
  269. }
  270. let persister = self.log.lock().saved[index].clone();
  271. let log_clone = self.log.clone();
  272. let raft =
  273. Raft::new(clients, index, persister, move |cmd_index, cmd| {
  274. Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
  275. });
  276. self.state.lock().rafts[index].replace(raft.clone());
  277. let raft = Arc::new(raft);
  278. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  279. Ok(())
  280. }
  281. /// Start a new command, returns (term, index).
  282. pub fn leader_start(
  283. &self,
  284. leader: usize,
  285. cmd: i32,
  286. ) -> Option<(usize, usize)> {
  287. self.state.lock().rafts[leader]
  288. .as_ref()
  289. .map(|raft| {
  290. raft.start(ruaft::Command(cmd))
  291. .map(|(term, index)| (term.0, index))
  292. })
  293. .unwrap()
  294. }
  295. pub fn is_connected(&self, index: usize) -> bool {
  296. self.state.lock().connected[index]
  297. }
  298. pub fn is_server_alive(&self, index: usize) -> bool {
  299. self.state.lock().rafts[index].is_some()
  300. }
  301. pub fn total_rpcs(&self) -> usize {
  302. unlock(&self.network).get_total_rpc_count()
  303. }
  304. pub fn set_unreliable(&self, yes: bool) {
  305. unlock(&self.network).set_reliable(!yes);
  306. }
  307. pub fn set_long_reordering(&self, yes: bool) {
  308. unlock(&self.network).set_long_reordering(yes);
  309. }
  310. pub fn end(&self) {}
  311. pub fn cleanup(&self) {
  312. let mut network = unlock(&self.network);
  313. for i in 0..self.server_count {
  314. network.remove_server(Self::server_name(i));
  315. }
  316. for raft in &mut self.state.lock().rafts {
  317. if let Some(raft) = raft.take() {
  318. raft.kill();
  319. }
  320. }
  321. }
  322. pub fn deferred_cleanup(&self) -> impl Drop + '_ {
  323. DropGuard::new(move || self.cleanup())
  324. }
  325. }
  326. impl Config {
  327. const INVALID_COMMAND: i32 = -1;
  328. fn apply_command(
  329. log_state: Arc<Mutex<LogState>>,
  330. server_index: usize,
  331. index: usize,
  332. command: i32,
  333. ) {
  334. let mut log_state = log_state.lock();
  335. let committed_logs = &mut log_state.committed_logs;
  336. let mut err = None;
  337. for (one_index, one_server) in committed_logs.iter().enumerate() {
  338. if one_server.len() > index && one_server[index] != command {
  339. err = Some((
  340. one_index,
  341. Err(anyhow!(
  342. "commit index={} server={} {} != server={} {}",
  343. index,
  344. server_index,
  345. command,
  346. one_index,
  347. one_server[index],
  348. )),
  349. ));
  350. break;
  351. }
  352. }
  353. let one_server = &mut committed_logs[server_index];
  354. if one_server.len() <= index {
  355. one_server.resize(index + 1, Self::INVALID_COMMAND);
  356. }
  357. one_server[index] = command;
  358. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  359. log_state.results[server_index] = Err(anyhow!(
  360. "server {} apply out of order {}",
  361. server_index,
  362. index
  363. ));
  364. } else if let Some((one_index, err)) = err {
  365. log_state.results[one_index] = err
  366. }
  367. if index > log_state.max_index {
  368. log_state.max_index = index;
  369. }
  370. }
  371. }
  372. fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
  373. locked.lock().expect("Unlocking network should not fail")
  374. }
  375. pub fn make_config(server_count: usize, unreliable: bool) -> Config {
  376. let network = labrpc::Network::run_daemon();
  377. {
  378. let mut unlocked_network = unlock(&network);
  379. unlocked_network.set_reliable(!unreliable);
  380. unlocked_network.set_long_delays(true);
  381. }
  382. let state = Mutex::new(ConfigState {
  383. rafts: vec![None; server_count],
  384. connected: vec![true; server_count],
  385. });
  386. let mut saved = vec![];
  387. saved.resize_with(server_count, || Arc::new(persister::Persister::new()));
  388. let log = Arc::new(Mutex::new(LogState {
  389. committed_logs: vec![vec![]; server_count],
  390. results: vec![],
  391. max_index: 0,
  392. saved,
  393. }));
  394. log.lock().results.resize_with(server_count, || Ok(()));
  395. let cfg = Config {
  396. network,
  397. server_count,
  398. state,
  399. log,
  400. };
  401. for i in 0..server_count {
  402. cfg.start1(i).expect("Starting server should not fail");
  403. }
  404. cfg
  405. }
  406. pub fn sleep_millis(mills: u64) {
  407. std::thread::sleep(std::time::Duration::from_millis(mills))
  408. }
  409. pub const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  410. pub fn sleep_election_timeouts(count: u64) {
  411. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  412. }