mod.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. use std::collections::HashMap;
  2. use std::path::PathBuf;
  3. use std::rc::Rc;
  4. use std::sync::Arc;
  5. use std::time::Instant;
  6. pub use anyhow::Result;
  7. use anyhow::{anyhow, bail};
  8. use parking_lot::Mutex;
  9. use rand::{thread_rng, Rng};
  10. use tokio::time::Duration;
  11. use ruaft::{ApplyCommandMessage, Persister, Raft, Term};
  12. use test_configs::register_server;
  13. pub mod persister;
  14. struct ConfigState {
  15. rafts: Vec<Option<Raft<i32>>>,
  16. connected: Vec<bool>,
  17. }
  18. struct LogState {
  19. committed_logs: Vec<Vec<i32>>,
  20. results: Vec<Result<()>>,
  21. max_index: usize,
  22. saved: Vec<Arc<persister::Persister>>,
  23. }
  24. pub struct Config {
  25. network: Arc<Mutex<labrpc::Network>>,
  26. server_count: usize,
  27. state: Mutex<ConfigState>,
  28. log: Arc<Mutex<LogState>>,
  29. log_file_path: PathBuf,
  30. test_path: &'static str,
  31. }
  32. impl Config {
  33. fn server_name(i: usize) -> String {
  34. format!("ruaft-server-{}", i)
  35. }
  36. fn client_name(client: usize, server: usize) -> String {
  37. format!("ruaft-client-{}-to-{}", client, server)
  38. }
  39. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  40. eprintln!("{}", msg);
  41. }
  42. pub fn check_one_leader(&self) -> Result<usize> {
  43. for _ in 0..10 {
  44. let millis = 450 + thread_rng().gen_range(0..100);
  45. sleep_millis(millis);
  46. let mut leaders = HashMap::new();
  47. let state = self.state.lock();
  48. for i in 0..self.server_count {
  49. if state.connected[i] {
  50. if let Some(raft) = &state.rafts[i] {
  51. let (term, is_leader) = raft.get_state();
  52. if is_leader {
  53. leaders
  54. .entry(term.0)
  55. .or_insert_with(Vec::new)
  56. .push(i)
  57. }
  58. }
  59. }
  60. }
  61. let mut last_term_with_leader = 0;
  62. let mut last_leader = 0;
  63. for (term, leaders) in leaders {
  64. if leaders.len() > 1 {
  65. bail!("term {} has {} (>1) leaders", term, leaders.len());
  66. }
  67. if term > last_term_with_leader {
  68. last_term_with_leader = term;
  69. last_leader = leaders[0];
  70. }
  71. }
  72. if last_term_with_leader != 0 {
  73. return Ok(last_leader);
  74. }
  75. }
  76. Err(anyhow!("expected one leader, got none"))
  77. }
  78. pub fn check_no_leader(&self) -> Result<()> {
  79. let state = self.state.lock();
  80. for i in 0..self.server_count {
  81. if state.connected[i] {
  82. if let Some(raft) = &state.rafts[i] {
  83. if raft.get_state().1 {
  84. bail!(
  85. "expected no leader, but {} claims to be leader",
  86. i
  87. );
  88. }
  89. }
  90. }
  91. }
  92. Ok(())
  93. }
  94. pub fn check_terms(&self) -> Result<Option<usize>> {
  95. let mut term = None;
  96. let state = self.state.lock();
  97. for i in 0..self.server_count {
  98. if state.connected[i] {
  99. if let Some(raft) = &state.rafts[i] {
  100. let raft_term = raft.get_state().0;
  101. if let Some(term) = term {
  102. if term != raft_term {
  103. bail!("Servers disagree on term")
  104. }
  105. } else {
  106. term.replace(raft_term);
  107. }
  108. }
  109. }
  110. }
  111. // Unwrap type Term into usize.
  112. Ok(term.map(|term| term.0))
  113. }
  114. /// Returns the number of peers that committed at least `index` commands,
  115. /// as well as the command at the index.
  116. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  117. let mut count = 0;
  118. let mut cmd = Self::INVALID_COMMAND;
  119. for i in 0..self.server_count {
  120. let log = self.log.lock();
  121. if let Err(e) = &log.results[i] {
  122. bail!(e.to_string())
  123. }
  124. if log.committed_logs[i].len() > index {
  125. let command = log.committed_logs[i][index];
  126. if count > 0 && command != cmd {
  127. bail!(
  128. "committed values do not match: index {}, {}, {}",
  129. index,
  130. cmd,
  131. command
  132. )
  133. }
  134. count += 1;
  135. cmd = command;
  136. }
  137. }
  138. Ok((count, cmd))
  139. }
  140. pub fn wait(
  141. &self,
  142. index: usize,
  143. min_count: usize,
  144. at_term: Option<usize>,
  145. ) -> Result<Option<i32>> {
  146. let mut sleep_time_mills = 10;
  147. for _ in 0..30 {
  148. let (count, _) = self.committed_count(index)?;
  149. if count >= min_count {
  150. break;
  151. }
  152. sleep_millis(sleep_time_mills);
  153. if sleep_time_mills < 1000 {
  154. sleep_time_mills <<= 1;
  155. }
  156. if let Some(at_term) = at_term {
  157. let state = self.state.lock();
  158. for raft in state.rafts.iter().flatten() {
  159. let (Term(term), _) = raft.get_state();
  160. if term > at_term {
  161. return Ok(None);
  162. }
  163. }
  164. }
  165. }
  166. let (count, cmd) = self.committed_count(index)?;
  167. if count < min_count {
  168. bail!(
  169. "only {} decided for index {}; wanted {}",
  170. count,
  171. index,
  172. min_count
  173. )
  174. }
  175. Ok(Some(cmd))
  176. }
  177. pub fn one(
  178. &self,
  179. cmd: i32,
  180. expected_servers: usize,
  181. retry: bool,
  182. ) -> Result<usize> {
  183. let start = Instant::now();
  184. let mut cnt = 0;
  185. while start.elapsed() < Duration::from_secs(10) {
  186. let mut first_index = None;
  187. for _ in 0..self.server_count {
  188. cnt += 1;
  189. cnt %= self.server_count;
  190. let state = self.state.lock();
  191. if state.connected[cnt] {
  192. if let Some(raft) = &state.rafts[cnt] {
  193. if let Some((_, index)) = raft.start(cmd) {
  194. first_index.replace(index);
  195. }
  196. }
  197. }
  198. }
  199. if let Some(index) = first_index {
  200. let agreement_start = Instant::now();
  201. while agreement_start.elapsed() < Duration::from_secs(2) {
  202. let (commit_count, committed_command) =
  203. self.committed_count(index)?;
  204. if commit_count > 0
  205. && commit_count >= expected_servers
  206. && committed_command == cmd
  207. {
  208. return Ok(index);
  209. }
  210. sleep_millis(20);
  211. }
  212. if !retry {
  213. break;
  214. }
  215. } else {
  216. sleep_millis(50);
  217. }
  218. }
  219. Err(anyhow!("one({}) failed to reach agreement", cmd))
  220. }
  221. pub fn connect(&self, index: usize) {
  222. self.set_connect(index, true);
  223. }
  224. pub fn disconnect(&self, index: usize) {
  225. self.set_connect(index, false);
  226. }
  227. pub fn set_connect(&self, index: usize, yes: bool) {
  228. let mut state = self.state.lock();
  229. state.connected[index] = yes;
  230. let mut network = self.network.lock();
  231. // Outgoing clients.
  232. for j in 0..self.server_count {
  233. if state.connected[j] {
  234. network.set_enable_client(Self::client_name(index, j), yes)
  235. }
  236. }
  237. // Incoming clients.
  238. for j in 0..self.server_count {
  239. if state.connected[j] {
  240. network.set_enable_client(Self::client_name(j, index), yes);
  241. }
  242. }
  243. }
  244. pub fn crash1(&self, index: usize) {
  245. self.disconnect(index);
  246. self.network.lock().remove_server(Self::server_name(index));
  247. let raft = self.state.lock().rafts[index].take();
  248. // There is a potential race condition here. It can be produced by
  249. // 1. Leader sends an AppendEntries request to follower.
  250. // 2. Follower received the request but have not processed it.
  251. // 3. We removed follower from the network and took a snapshot of the
  252. // follower's state.
  253. // 4. Follower appended entries, replied to the leader. Note although
  254. // the follower is removed from the network, it can still send replies.
  255. // 5. The leader believes the entries are appended, but they are not.
  256. let data = self.log.lock().saved[index].read_state();
  257. // Make sure to give up the log lock before calling external code, which
  258. // might directly or indirectly block on the log lock, e.g. through
  259. // the apply command function.
  260. if let Some(raft) = raft {
  261. raft.kill();
  262. }
  263. let mut log = self.log.lock();
  264. log.saved[index] = Arc::new(persister::Persister::new());
  265. log.saved[index].save_state(data);
  266. }
  267. pub fn start1(&self, index: usize) -> Result<()> {
  268. if self.state.lock().rafts[index].is_some() {
  269. self.crash1(index);
  270. }
  271. let mut clients = vec![];
  272. {
  273. let mut network = self.network.lock();
  274. for j in 0..self.server_count {
  275. clients.push(test_configs::RpcClient::new(network.make_client(
  276. Self::client_name(index, j),
  277. Self::server_name(j),
  278. )))
  279. }
  280. }
  281. let persister = self.log.lock().saved[index].clone();
  282. let log_clone = self.log.clone();
  283. let raft = Raft::new(
  284. clients,
  285. index,
  286. persister,
  287. move |message| {
  288. Self::apply_command(log_clone.clone(), index, message)
  289. },
  290. None,
  291. Raft::<i32>::NO_SNAPSHOT,
  292. );
  293. self.state.lock().rafts[index].replace(raft.clone());
  294. let raft = Rc::new(raft);
  295. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  296. Ok(())
  297. }
  298. /// Start a new command, returns (term, index).
  299. pub fn leader_start(
  300. &self,
  301. leader: usize,
  302. cmd: i32,
  303. ) -> Option<(usize, usize)> {
  304. self.state.lock().rafts[leader]
  305. .as_ref()
  306. .map(|raft| raft.start(cmd).map(|(term, index)| (term.0, index)))
  307. .unwrap()
  308. }
  309. pub fn is_connected(&self, index: usize) -> bool {
  310. self.state.lock().connected[index]
  311. }
  312. pub fn is_server_alive(&self, index: usize) -> bool {
  313. self.state.lock().rafts[index].is_some()
  314. }
  315. pub fn total_rpcs(&self) -> usize {
  316. self.network.lock().get_total_rpc_count()
  317. }
  318. pub fn set_unreliable(&self, yes: bool) {
  319. self.network.lock().set_reliable(!yes);
  320. }
  321. pub fn set_long_reordering(&self, yes: bool) {
  322. self.network.lock().set_long_reordering(yes);
  323. }
  324. pub fn end(&self) {}
  325. pub fn cleanup(&self) {
  326. log::trace!("Cleaning up test config ...");
  327. let mut network = self.network.lock();
  328. for i in 0..self.server_count {
  329. network.remove_server(Self::server_name(i));
  330. }
  331. network.stop();
  332. drop(network);
  333. for raft in &mut self.state.lock().rafts {
  334. if let Some(raft) = raft.take() {
  335. raft.kill();
  336. }
  337. }
  338. log::trace!("Cleaning up test config done.");
  339. eprintln!(
  340. "Ruaft log file for {}: {:?}",
  341. self.test_path,
  342. self.log_file_path.as_os_str()
  343. );
  344. }
  345. }
  346. impl Config {
  347. const INVALID_COMMAND: i32 = -1;
  348. fn apply_command(
  349. log_state: Arc<Mutex<LogState>>,
  350. server_index: usize,
  351. message: ApplyCommandMessage<i32>,
  352. ) {
  353. let (index, command) =
  354. if let ApplyCommandMessage::Command(index, command) = message {
  355. (index, command)
  356. } else {
  357. // Ignore snapshots.
  358. return;
  359. };
  360. let mut log_state = log_state.lock();
  361. let committed_logs = &mut log_state.committed_logs;
  362. let mut err = None;
  363. for (one_index, one_server) in committed_logs.iter().enumerate() {
  364. if one_server.len() > index && one_server[index] != command {
  365. err = Some((
  366. one_index,
  367. Err(anyhow!(
  368. "commit index={} server={} {} != server={} {}",
  369. index,
  370. server_index,
  371. command,
  372. one_index,
  373. one_server[index],
  374. )),
  375. ));
  376. break;
  377. }
  378. }
  379. let one_server = &mut committed_logs[server_index];
  380. if one_server.len() <= index {
  381. one_server.resize(index + 1, Self::INVALID_COMMAND);
  382. }
  383. one_server[index] = command;
  384. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  385. log_state.results[server_index] = Err(anyhow!(
  386. "server {} apply out of order {}",
  387. server_index,
  388. index
  389. ));
  390. } else if let Some((one_index, err)) = err {
  391. log_state.results[one_index] = err
  392. }
  393. if index > log_state.max_index {
  394. log_state.max_index = index;
  395. }
  396. }
  397. }
  398. #[macro_export]
  399. macro_rules! make_config {
  400. ($server_count:expr, $unreliable:expr) => {
  401. $crate::config::make_config(
  402. $server_count,
  403. $unreliable,
  404. stdext::function_name!(),
  405. )
  406. };
  407. }
  408. pub fn make_config(
  409. server_count: usize,
  410. unreliable: bool,
  411. test_path: &'static str,
  412. ) -> Config {
  413. // Create a logger first.
  414. let log_file_path = test_utils::init_log(test_path)
  415. .expect("Test log file creation should never fail");
  416. let network = labrpc::Network::run_daemon();
  417. {
  418. let mut unlocked_network = network.lock();
  419. unlocked_network.set_reliable(!unreliable);
  420. unlocked_network.set_long_delays(true);
  421. }
  422. let state = Mutex::new(ConfigState {
  423. rafts: vec![None; server_count],
  424. connected: vec![true; server_count],
  425. });
  426. let mut saved = vec![];
  427. saved.resize_with(server_count, || Arc::new(persister::Persister::new()));
  428. let log = Arc::new(Mutex::new(LogState {
  429. committed_logs: vec![vec![]; server_count],
  430. results: vec![],
  431. max_index: 0,
  432. saved,
  433. }));
  434. log.lock().results.resize_with(server_count, || Ok(()));
  435. let cfg = Config {
  436. network,
  437. server_count,
  438. state,
  439. log,
  440. log_file_path,
  441. test_path,
  442. };
  443. for i in 0..server_count {
  444. cfg.start1(i).expect("Starting server should not fail");
  445. }
  446. cfg
  447. }
  448. pub fn sleep_millis(mills: u64) {
  449. std::thread::sleep(std::time::Duration::from_millis(mills))
  450. }
  451. pub const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  452. pub fn sleep_election_timeouts(count: u64) {
  453. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  454. }