mod.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::time::Instant;
  4. pub use anyhow::Result;
  5. use parking_lot::Mutex;
  6. use rand::{thread_rng, Rng};
  7. use tokio::time::Duration;
  8. use ruaft::rpcs::register_server;
  9. use ruaft::{Persister, Raft, RpcClient, Snapshot};
  10. pub mod persister;
  11. struct ConfigState {
  12. rafts: Vec<Option<Raft<i32>>>,
  13. connected: Vec<bool>,
  14. }
  15. struct LogState {
  16. committed_logs: Vec<Vec<i32>>,
  17. results: Vec<Result<()>>,
  18. max_index: usize,
  19. saved: Vec<Arc<persister::Persister>>,
  20. }
  21. pub struct Config {
  22. network: Arc<Mutex<labrpc::Network>>,
  23. server_count: usize,
  24. state: Mutex<ConfigState>,
  25. log: Arc<Mutex<LogState>>,
  26. }
  27. impl Config {
  28. fn server_name(i: usize) -> String {
  29. format!("ruaft-server-{}", i)
  30. }
  31. fn client_name(client: usize, server: usize) -> String {
  32. format!("ruaft-client-{}-to-{}", client, server)
  33. }
  34. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  35. eprintln!("{}", msg);
  36. }
  37. pub fn check_one_leader(&self) -> Result<usize> {
  38. for _ in 0..10 {
  39. let millis = 450 + thread_rng().gen_range(0..100);
  40. sleep_millis(millis);
  41. let mut leaders = HashMap::new();
  42. let state = self.state.lock();
  43. for i in 0..self.server_count {
  44. if state.connected[i] {
  45. if let Some(raft) = &state.rafts[i] {
  46. let (term, is_leader) = raft.get_state();
  47. if is_leader {
  48. leaders.entry(term.0).or_insert(vec![]).push(i)
  49. }
  50. }
  51. }
  52. }
  53. let mut last_term_with_leader = 0;
  54. let mut last_leader = 0;
  55. for (term, leaders) in leaders {
  56. if leaders.len() > 1 {
  57. bail!("term {} has {} (>1) leaders", term, leaders.len());
  58. }
  59. if term > last_term_with_leader {
  60. last_term_with_leader = term;
  61. last_leader = leaders[0];
  62. }
  63. }
  64. if last_term_with_leader != 0 {
  65. return Ok(last_leader);
  66. }
  67. }
  68. Err(anyhow!("expected one leader, got none"))
  69. }
  70. pub fn check_no_leader(&self) -> Result<()> {
  71. let state = self.state.lock();
  72. for i in 0..self.server_count {
  73. if state.connected[i] {
  74. if let Some(raft) = &state.rafts[i] {
  75. if raft.get_state().1 {
  76. bail!(
  77. "expected no leader, but {} claims to be leader",
  78. i
  79. );
  80. }
  81. }
  82. }
  83. }
  84. Ok(())
  85. }
  86. pub fn check_terms(&self) -> Result<Option<usize>> {
  87. let mut term = None;
  88. let state = self.state.lock();
  89. for i in 0..self.server_count {
  90. if state.connected[i] {
  91. if let Some(raft) = &state.rafts[i] {
  92. let raft_term = raft.get_state().0;
  93. if let Some(term) = term {
  94. if term != raft_term {
  95. bail!("Servers disagree on term")
  96. }
  97. } else {
  98. term.replace(raft_term);
  99. }
  100. }
  101. }
  102. }
  103. // Unwrap type Term into usize.
  104. Ok(term.map(|term| term.0))
  105. }
  106. /// Returns the number of peers that committed at least `index` commands,
  107. /// as well as the command at the index.
  108. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  109. let mut count = 0;
  110. let mut cmd = Self::INVALID_COMMAND;
  111. for i in 0..self.server_count {
  112. let log = self.log.lock();
  113. if let Err(e) = &log.results[i] {
  114. bail!(e.to_string())
  115. }
  116. if log.committed_logs[i].len() > index {
  117. let command = log.committed_logs[i][index];
  118. if count > 0 && command != cmd {
  119. bail!(
  120. "committed values do not match: index {}, {}, {}",
  121. index,
  122. cmd,
  123. command
  124. )
  125. }
  126. count += 1;
  127. cmd = command;
  128. }
  129. }
  130. Ok((count, cmd))
  131. }
  132. pub fn wait(
  133. &self,
  134. index: usize,
  135. min_count: usize,
  136. at_term: Option<usize>,
  137. ) -> Result<Option<i32>> {
  138. let mut sleep_time_mills = 10;
  139. for _ in 0..30 {
  140. let (count, _) = self.committed_count(index)?;
  141. if count >= min_count {
  142. break;
  143. }
  144. sleep_millis(sleep_time_mills);
  145. if sleep_time_mills < 1000 {
  146. sleep_time_mills <<= 1;
  147. }
  148. if let Some(at_term) = at_term {
  149. let state = self.state.lock();
  150. for raft in &state.rafts {
  151. if let Some(raft) = raft {
  152. let (term, _) = raft.get_state();
  153. if term.0 > at_term {
  154. return Ok(None);
  155. }
  156. }
  157. }
  158. }
  159. }
  160. let (count, cmd) = self.committed_count(index)?;
  161. if count < min_count {
  162. bail!(
  163. "only {} decided for index {}; wanted {}",
  164. count,
  165. index,
  166. min_count
  167. )
  168. }
  169. Ok(Some(cmd))
  170. }
  171. pub fn one(
  172. &self,
  173. cmd: i32,
  174. expected_servers: usize,
  175. retry: bool,
  176. ) -> Result<usize> {
  177. let start = Instant::now();
  178. let mut cnt = 0;
  179. while start.elapsed() < Duration::from_secs(10) {
  180. let mut first_index = None;
  181. for _ in 0..self.server_count {
  182. cnt += 1;
  183. cnt %= self.server_count;
  184. let state = self.state.lock();
  185. if state.connected[cnt] {
  186. if let Some(raft) = &state.rafts[cnt] {
  187. if let Some((_, index)) = raft.start(cmd) {
  188. first_index.replace(index);
  189. }
  190. }
  191. }
  192. }
  193. if let Some(index) = first_index {
  194. let agreement_start = Instant::now();
  195. while agreement_start.elapsed() < Duration::from_secs(2) {
  196. let (commit_count, committed_command) =
  197. self.committed_count(index)?;
  198. if commit_count > 0
  199. && commit_count >= expected_servers
  200. && committed_command == cmd
  201. {
  202. return Ok(index);
  203. }
  204. sleep_millis(20);
  205. }
  206. if !retry {
  207. break;
  208. }
  209. } else {
  210. sleep_millis(50);
  211. }
  212. }
  213. Err(anyhow!("one({}) failed to reach agreement", cmd))
  214. }
  215. pub fn connect(&self, index: usize) {
  216. self.set_connect(index, true);
  217. }
  218. pub fn disconnect(&self, index: usize) {
  219. self.set_connect(index, false);
  220. }
  221. pub fn set_connect(&self, index: usize, yes: bool) {
  222. let mut state = self.state.lock();
  223. state.connected[index] = yes;
  224. let mut network = self.network.lock();
  225. // Outgoing clients.
  226. for j in 0..self.server_count {
  227. if state.connected[j] {
  228. network.set_enable_client(Self::client_name(index, j), yes)
  229. }
  230. }
  231. // Incoming clients.
  232. for j in 0..self.server_count {
  233. if state.connected[j] {
  234. network.set_enable_client(Self::client_name(j, index), yes);
  235. }
  236. }
  237. }
  238. pub fn crash1(&self, index: usize) {
  239. self.disconnect(index);
  240. self.network.lock().remove_server(Self::server_name(index));
  241. let raft = self.state.lock().rafts[index].take();
  242. // There is a potential race condition here. It can be produced by
  243. // 1. Leader sends an AppendEntries request to follower.
  244. // 2. Follower received the request but have not processed it.
  245. // 3. We removed follower from the network and took a snapshot of the
  246. // follower's state.
  247. // 4. Follower appended entries, replied to the leader. Note although
  248. // the follower is removed from the network, it can still send replies.
  249. // 5. The leader believes the entries are appended, but they are not.
  250. let data = self.log.lock().saved[index].read_state();
  251. // Make sure to give up the log lock before calling external code, which
  252. // might directly or indirectly block on the log lock, e.g. through
  253. // the apply command function.
  254. if let Some(raft) = raft {
  255. raft.kill();
  256. }
  257. let mut log = self.log.lock();
  258. log.saved[index] = Arc::new(persister::Persister::new());
  259. log.saved[index].save_state(data);
  260. }
  261. pub fn start1(&self, index: usize) -> Result<()> {
  262. if self.state.lock().rafts[index].is_some() {
  263. self.crash1(index);
  264. }
  265. let mut clients = vec![];
  266. {
  267. let mut network = self.network.lock();
  268. for j in 0..self.server_count {
  269. clients.push(RpcClient::new(network.make_client(
  270. Self::client_name(index, j),
  271. Self::server_name(j),
  272. )))
  273. }
  274. }
  275. let persister = self.log.lock().saved[index].clone();
  276. let log_clone = self.log.clone();
  277. let raft = Raft::new(
  278. clients,
  279. index,
  280. persister,
  281. move |cmd_index, cmd| {
  282. Self::apply_command(log_clone.clone(), index, cmd_index, cmd)
  283. },
  284. None,
  285. |index| Snapshot {
  286. last_included_index: index,
  287. data: vec![],
  288. },
  289. );
  290. self.state.lock().rafts[index].replace(raft.clone());
  291. let raft = Arc::new(raft);
  292. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  293. Ok(())
  294. }
  295. /// Start a new command, returns (term, index).
  296. pub fn leader_start(
  297. &self,
  298. leader: usize,
  299. cmd: i32,
  300. ) -> Option<(usize, usize)> {
  301. self.state.lock().rafts[leader]
  302. .as_ref()
  303. .map(|raft| raft.start(cmd).map(|(term, index)| (term.0, index)))
  304. .unwrap()
  305. }
  306. pub fn is_connected(&self, index: usize) -> bool {
  307. self.state.lock().connected[index]
  308. }
  309. pub fn is_server_alive(&self, index: usize) -> bool {
  310. self.state.lock().rafts[index].is_some()
  311. }
  312. pub fn total_rpcs(&self) -> usize {
  313. self.network.lock().get_total_rpc_count()
  314. }
  315. pub fn set_unreliable(&self, yes: bool) {
  316. self.network.lock().set_reliable(!yes);
  317. }
  318. pub fn set_long_reordering(&self, yes: bool) {
  319. self.network.lock().set_long_reordering(yes);
  320. }
  321. pub fn end(&self) {}
  322. pub fn cleanup(&self) {
  323. let mut network = self.network.lock();
  324. for i in 0..self.server_count {
  325. network.remove_server(Self::server_name(i));
  326. }
  327. network.stop();
  328. drop(network);
  329. for raft in &mut self.state.lock().rafts {
  330. if let Some(raft) = raft.take() {
  331. raft.kill();
  332. }
  333. }
  334. }
  335. }
  336. impl Config {
  337. const INVALID_COMMAND: i32 = -1;
  338. fn apply_command(
  339. log_state: Arc<Mutex<LogState>>,
  340. server_index: usize,
  341. index: usize,
  342. command: i32,
  343. ) {
  344. let mut log_state = log_state.lock();
  345. let committed_logs = &mut log_state.committed_logs;
  346. let mut err = None;
  347. for (one_index, one_server) in committed_logs.iter().enumerate() {
  348. if one_server.len() > index && one_server[index] != command {
  349. err = Some((
  350. one_index,
  351. Err(anyhow!(
  352. "commit index={} server={} {} != server={} {}",
  353. index,
  354. server_index,
  355. command,
  356. one_index,
  357. one_server[index],
  358. )),
  359. ));
  360. break;
  361. }
  362. }
  363. let one_server = &mut committed_logs[server_index];
  364. if one_server.len() <= index {
  365. one_server.resize(index + 1, Self::INVALID_COMMAND);
  366. }
  367. one_server[index] = command;
  368. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  369. log_state.results[server_index] = Err(anyhow!(
  370. "server {} apply out of order {}",
  371. server_index,
  372. index
  373. ));
  374. } else if let Some((one_index, err)) = err {
  375. log_state.results[one_index] = err
  376. }
  377. if index > log_state.max_index {
  378. log_state.max_index = index;
  379. }
  380. }
  381. }
  382. pub fn make_config(server_count: usize, unreliable: bool) -> Config {
  383. let network = labrpc::Network::run_daemon();
  384. {
  385. let mut unlocked_network = network.lock();
  386. unlocked_network.set_reliable(!unreliable);
  387. unlocked_network.set_long_delays(true);
  388. }
  389. let state = Mutex::new(ConfigState {
  390. rafts: vec![None; server_count],
  391. connected: vec![true; server_count],
  392. });
  393. let mut saved = vec![];
  394. saved.resize_with(server_count, || Arc::new(persister::Persister::new()));
  395. let log = Arc::new(Mutex::new(LogState {
  396. committed_logs: vec![vec![]; server_count],
  397. results: vec![],
  398. max_index: 0,
  399. saved,
  400. }));
  401. log.lock().results.resize_with(server_count, || Ok(()));
  402. let cfg = Config {
  403. network,
  404. server_count,
  405. state,
  406. log,
  407. };
  408. for i in 0..server_count {
  409. cfg.start1(i).expect("Starting server should not fail");
  410. }
  411. cfg
  412. }
  413. pub fn sleep_millis(mills: u64) {
  414. std::thread::sleep(std::time::Duration::from_millis(mills))
  415. }
  416. pub const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  417. pub fn sleep_election_timeouts(count: u64) {
  418. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  419. }