config.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. use std::collections::HashMap;
  2. use std::path::PathBuf;
  3. use std::sync::Arc;
  4. use std::time::{Duration, Instant};
  5. pub use anyhow::Result;
  6. use anyhow::{anyhow, bail};
  7. use parking_lot::Mutex;
  8. use rand::{thread_rng, Rng};
  9. use crate::register_server;
  10. use crate::utils::{sleep_millis, NO_SNAPSHOT};
  11. use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
  12. struct ConfigState {
  13. rafts: Vec<Option<Raft<i32>>>,
  14. connected: Vec<bool>,
  15. }
  16. struct LogState {
  17. committed_logs: Vec<Vec<i32>>,
  18. results: Vec<Result<()>>,
  19. max_index: usize,
  20. saved: Vec<Arc<crate::Persister>>,
  21. }
  22. pub struct Config {
  23. network: Arc<Mutex<labrpc::Network>>,
  24. server_count: usize,
  25. state: Mutex<ConfigState>,
  26. log: Arc<Mutex<LogState>>,
  27. log_file_path: PathBuf,
  28. test_path: &'static str,
  29. }
  30. impl Config {
  31. fn server_name(i: usize) -> String {
  32. format!("ruaft-server-{}", i)
  33. }
  34. fn client_name(client: usize, server: usize) -> String {
  35. format!("ruaft-client-{}-to-{}", client, server)
  36. }
  37. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  38. eprintln!("{}", msg);
  39. }
  40. pub fn check_one_leader(&self) -> Result<usize> {
  41. for _ in 0..10 {
  42. let millis = 450 + thread_rng().gen_range(0..100);
  43. sleep_millis(millis);
  44. let mut leaders = HashMap::new();
  45. let state = self.state.lock();
  46. for i in 0..self.server_count {
  47. if state.connected[i] {
  48. if let Some(raft) = &state.rafts[i] {
  49. let (term, is_leader) = raft.get_state();
  50. if is_leader {
  51. leaders
  52. .entry(term.0)
  53. .or_insert_with(Vec::new)
  54. .push(i)
  55. }
  56. }
  57. }
  58. }
  59. let mut last_term_with_leader = 0;
  60. let mut last_leader = 0;
  61. for (term, leaders) in leaders {
  62. if leaders.len() > 1 {
  63. bail!("term {} has {} (>1) leaders", term, leaders.len());
  64. }
  65. if term > last_term_with_leader {
  66. last_term_with_leader = term;
  67. last_leader = leaders[0];
  68. }
  69. }
  70. if last_term_with_leader != 0 {
  71. return Ok(last_leader);
  72. }
  73. }
  74. Err(anyhow!("expected one leader, got none"))
  75. }
  76. pub fn check_no_leader(&self) -> Result<()> {
  77. let state = self.state.lock();
  78. for i in 0..self.server_count {
  79. if state.connected[i] {
  80. if let Some(raft) = &state.rafts[i] {
  81. if raft.get_state().1 {
  82. bail!(
  83. "expected no leader, but {} claims to be leader",
  84. i
  85. );
  86. }
  87. }
  88. }
  89. }
  90. Ok(())
  91. }
  92. pub fn check_terms(&self) -> Result<Option<usize>> {
  93. let mut term = None;
  94. let state = self.state.lock();
  95. for i in 0..self.server_count {
  96. if state.connected[i] {
  97. if let Some(raft) = &state.rafts[i] {
  98. let raft_term = raft.get_state().0;
  99. if let Some(term) = term {
  100. if term != raft_term {
  101. bail!("Servers disagree on term")
  102. }
  103. } else {
  104. term.replace(raft_term);
  105. }
  106. }
  107. }
  108. }
  109. // Unwrap type Term into usize.
  110. Ok(term.map(|term| term.0))
  111. }
  112. /// Returns the number of peers that committed at least `index` commands,
  113. /// as well as the command at the index.
  114. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  115. let mut count = 0;
  116. let mut cmd = Self::INVALID_COMMAND;
  117. for i in 0..self.server_count {
  118. let log = self.log.lock();
  119. if let Err(e) = &log.results[i] {
  120. bail!(e.to_string())
  121. }
  122. if log.committed_logs[i].len() > index {
  123. let command = log.committed_logs[i][index];
  124. if count > 0 && command != cmd {
  125. bail!(
  126. "committed values do not match: index {}, {}, {}",
  127. index,
  128. cmd,
  129. command
  130. )
  131. }
  132. count += 1;
  133. cmd = command;
  134. }
  135. }
  136. Ok((count, cmd))
  137. }
  138. pub fn wait(
  139. &self,
  140. index: usize,
  141. min_count: usize,
  142. at_term: Option<usize>,
  143. ) -> Result<Option<i32>> {
  144. let mut sleep_time_mills = 10;
  145. for _ in 0..30 {
  146. let (count, _) = self.committed_count(index)?;
  147. if count >= min_count {
  148. break;
  149. }
  150. sleep_millis(sleep_time_mills);
  151. if sleep_time_mills < 1000 {
  152. sleep_time_mills <<= 1;
  153. }
  154. if let Some(at_term) = at_term {
  155. let state = self.state.lock();
  156. for raft in state.rafts.iter().flatten() {
  157. let (Term(term), _) = raft.get_state();
  158. if term > at_term {
  159. return Ok(None);
  160. }
  161. }
  162. }
  163. }
  164. let (count, cmd) = self.committed_count(index)?;
  165. if count < min_count {
  166. bail!(
  167. "only {} decided for index {}; wanted {}",
  168. count,
  169. index,
  170. min_count
  171. )
  172. }
  173. Ok(Some(cmd))
  174. }
  175. pub fn one(
  176. &self,
  177. cmd: i32,
  178. expected_servers: usize,
  179. retry: bool,
  180. ) -> Result<usize> {
  181. let start = Instant::now();
  182. let mut cnt = 0;
  183. while start.elapsed() < Duration::from_secs(10) {
  184. let mut first_index = None;
  185. for _ in 0..self.server_count {
  186. cnt += 1;
  187. cnt %= self.server_count;
  188. let state = self.state.lock();
  189. if state.connected[cnt] {
  190. if let Some(raft) = &state.rafts[cnt] {
  191. if let Some((_, index)) = raft.start(cmd) {
  192. first_index.replace(index);
  193. }
  194. }
  195. }
  196. }
  197. if let Some(index) = first_index {
  198. let agreement_start = Instant::now();
  199. while agreement_start.elapsed() < Duration::from_secs(2) {
  200. let (commit_count, committed_command) =
  201. self.committed_count(index)?;
  202. if commit_count > 0
  203. && commit_count >= expected_servers
  204. && committed_command == cmd
  205. {
  206. return Ok(index);
  207. }
  208. sleep_millis(20);
  209. }
  210. if !retry {
  211. break;
  212. }
  213. } else {
  214. sleep_millis(50);
  215. }
  216. }
  217. Err(anyhow!("one({}) failed to reach agreement", cmd))
  218. }
  219. pub fn connect(&self, index: usize) {
  220. self.set_connect(index, true);
  221. }
  222. pub fn disconnect(&self, index: usize) {
  223. self.set_connect(index, false);
  224. }
  225. pub fn set_connect(&self, index: usize, yes: bool) {
  226. let mut state = self.state.lock();
  227. state.connected[index] = yes;
  228. let mut network = self.network.lock();
  229. // Outgoing clients.
  230. for j in 0..self.server_count {
  231. if state.connected[j] {
  232. network.set_enable_client(Self::client_name(index, j), yes)
  233. }
  234. }
  235. // Incoming clients.
  236. for j in 0..self.server_count {
  237. if state.connected[j] {
  238. network.set_enable_client(Self::client_name(j, index), yes);
  239. }
  240. }
  241. }
  242. pub fn crash1(&self, index: usize) {
  243. self.disconnect(index);
  244. self.network.lock().remove_server(Self::server_name(index));
  245. let raft = self.state.lock().rafts[index].take();
  246. // There is a potential race condition here. It can be produced by
  247. // 1. Leader sends an AppendEntries request to follower.
  248. // 2. Follower received the request but have not processed it.
  249. // 3. We removed follower from the network and took a snapshot of the
  250. // follower's state.
  251. // 4. Follower appended entries, replied to the leader. Note although
  252. // the follower is removed from the network, it can still send replies.
  253. // 5. The leader believes the entries are appended, but they are not.
  254. let data = self.log.lock().saved[index].read_state();
  255. // Make sure to give up the log lock before calling external code, which
  256. // might directly or indirectly block on the log lock, e.g. through
  257. // the apply command function.
  258. if let Some(raft) = raft {
  259. raft.kill();
  260. }
  261. let mut log = self.log.lock();
  262. log.saved[index] = Arc::new(crate::Persister::new());
  263. log.saved[index].save_state(data);
  264. }
  265. pub fn start1(&self, index: usize) -> Result<()> {
  266. if self.state.lock().rafts[index].is_some() {
  267. self.crash1(index);
  268. }
  269. let mut clients = vec![];
  270. {
  271. let mut network = self.network.lock();
  272. for j in 0..self.server_count {
  273. clients.push(crate::RpcClient::new(network.make_client(
  274. Self::client_name(index, j),
  275. Self::server_name(j),
  276. )))
  277. }
  278. }
  279. let persister = self.log.lock().saved[index].clone();
  280. let log_clone = self.log.clone();
  281. let raft = Raft::new(
  282. clients,
  283. index,
  284. persister,
  285. move |message| {
  286. Self::apply_command(log_clone.clone(), index, message)
  287. },
  288. None,
  289. NO_SNAPSHOT,
  290. );
  291. self.state.lock().rafts[index].replace(raft.clone());
  292. let raft = Arc::new(raft);
  293. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  294. Ok(())
  295. }
  296. /// Start a new command, returns (term, index).
  297. pub fn leader_start(
  298. &self,
  299. leader: usize,
  300. cmd: i32,
  301. ) -> Option<(usize, usize)> {
  302. self.state.lock().rafts[leader]
  303. .as_ref()
  304. .map(|raft| raft.start(cmd).map(|(term, index)| (term.0, index)))
  305. .unwrap()
  306. }
  307. pub fn is_connected(&self, index: usize) -> bool {
  308. self.state.lock().connected[index]
  309. }
  310. pub fn is_server_alive(&self, index: usize) -> bool {
  311. self.state.lock().rafts[index].is_some()
  312. }
  313. pub fn total_rpcs(&self) -> usize {
  314. self.network.lock().get_total_rpc_count()
  315. }
  316. pub fn set_unreliable(&self, yes: bool) {
  317. self.network.lock().set_reliable(!yes);
  318. }
  319. pub fn set_long_reordering(&self, yes: bool) {
  320. self.network.lock().set_long_reordering(yes);
  321. }
  322. pub fn end(&self) {}
  323. pub fn cleanup(&self) {
  324. log::trace!("Cleaning up test raft.config ...");
  325. let mut network = self.network.lock();
  326. for i in 0..self.server_count {
  327. network.remove_server(Self::server_name(i));
  328. }
  329. network.stop();
  330. drop(network);
  331. for raft in &mut self.state.lock().rafts {
  332. if let Some(raft) = raft.take() {
  333. raft.kill();
  334. }
  335. }
  336. log::trace!("Cleaning up test raft.config done.");
  337. eprintln!(
  338. "Ruaft log file for {}: {:?}",
  339. self.test_path,
  340. self.log_file_path.as_os_str()
  341. );
  342. }
  343. }
  344. impl Config {
  345. const INVALID_COMMAND: i32 = -1;
  346. fn apply_command(
  347. log_state: Arc<Mutex<LogState>>,
  348. server_index: usize,
  349. message: ApplyCommandMessage<i32>,
  350. ) {
  351. let (index, command) =
  352. if let ApplyCommandMessage::Command(index, command) = message {
  353. (index, command)
  354. } else {
  355. // Ignore snapshots.
  356. return;
  357. };
  358. let mut log_state = log_state.lock();
  359. let committed_logs = &mut log_state.committed_logs;
  360. let mut err = None;
  361. for (one_index, one_server) in committed_logs.iter().enumerate() {
  362. if one_server.len() > index && one_server[index] != command {
  363. err = Some((
  364. one_index,
  365. Err(anyhow!(
  366. "commit index={} server={} {} != server={} {}",
  367. index,
  368. server_index,
  369. command,
  370. one_index,
  371. one_server[index],
  372. )),
  373. ));
  374. break;
  375. }
  376. }
  377. let one_server = &mut committed_logs[server_index];
  378. if one_server.len() <= index {
  379. one_server.resize(index + 1, Self::INVALID_COMMAND);
  380. }
  381. one_server[index] = command;
  382. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  383. log_state.results[server_index] = Err(anyhow!(
  384. "server {} apply out of order {}",
  385. server_index,
  386. index
  387. ));
  388. } else if let Some((one_index, err)) = err {
  389. log_state.results[one_index] = err
  390. }
  391. if index > log_state.max_index {
  392. log_state.max_index = index;
  393. }
  394. }
  395. }
  396. #[macro_export]
  397. macro_rules! make_config {
  398. ($server_count:expr, $unreliable:expr) => {
  399. $crate::raft::config::make_config(
  400. $server_count,
  401. $unreliable,
  402. stdext::function_name!(),
  403. )
  404. };
  405. }
  406. pub fn make_config(
  407. server_count: usize,
  408. unreliable: bool,
  409. test_path: &'static str,
  410. ) -> Config {
  411. // Create a logger first.
  412. let log_file_path = test_utils::init_log(test_path)
  413. .expect("Test log file creation should never fail");
  414. let network = labrpc::Network::run_daemon();
  415. {
  416. let mut unlocked_network = network.lock();
  417. unlocked_network.set_reliable(!unreliable);
  418. unlocked_network.set_long_delays(true);
  419. }
  420. let state = Mutex::new(ConfigState {
  421. rafts: vec![None; server_count],
  422. connected: vec![true; server_count],
  423. });
  424. let mut saved = vec![];
  425. saved.resize_with(server_count, || Arc::new(crate::Persister::new()));
  426. let log = Arc::new(Mutex::new(LogState {
  427. committed_logs: vec![vec![]; server_count],
  428. results: vec![],
  429. max_index: 0,
  430. saved,
  431. }));
  432. log.lock().results.resize_with(server_count, || Ok(()));
  433. let cfg = Config {
  434. network,
  435. server_count,
  436. state,
  437. log,
  438. log_file_path,
  439. test_path,
  440. };
  441. for i in 0..server_count {
  442. cfg.start1(i).expect("Starting server should not fail");
  443. }
  444. cfg
  445. }