config.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. use std::collections::HashMap;
  2. use std::path::PathBuf;
  3. use std::sync::Arc;
  4. use std::time::{Duration, Instant};
  5. pub use anyhow::Result;
  6. use anyhow::{anyhow, bail};
  7. use parking_lot::Mutex;
  8. use rand::{thread_rng, Rng};
  9. use ruaft::{ApplyCommandMessage, Raft, Term};
  10. use crate::register_server;
  11. use crate::utils::{sleep_millis, NO_SNAPSHOT};
  12. use crate::InMemoryStorage;
  13. struct ConfigState {
  14. rafts: Vec<Option<Raft<i32>>>,
  15. connected: Vec<bool>,
  16. }
  17. struct LogState {
  18. committed_logs: Vec<Vec<i32>>,
  19. results: Vec<Result<()>>,
  20. max_index: usize,
  21. saved: Vec<InMemoryStorage>,
  22. }
  23. pub struct Config {
  24. network: Arc<Mutex<labrpc::Network>>,
  25. server_count: usize,
  26. state: Mutex<ConfigState>,
  27. log: Arc<Mutex<LogState>>,
  28. log_file_path: PathBuf,
  29. test_path: &'static str,
  30. }
  31. impl Config {
  32. fn server_name(i: usize) -> String {
  33. format!("ruaft-server-{}", i)
  34. }
  35. fn client_name(client: usize, server: usize) -> String {
  36. format!("ruaft-client-{}-to-{}", client, server)
  37. }
  38. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  39. eprintln!("{}", msg);
  40. }
  41. pub fn check_one_leader(&self) -> Result<usize> {
  42. for _ in 0..10 {
  43. let millis = 450 + thread_rng().gen_range(0..100);
  44. sleep_millis(millis);
  45. let mut leaders = HashMap::new();
  46. let state = self.state.lock();
  47. for i in 0..self.server_count {
  48. if state.connected[i] {
  49. if let Some(raft) = &state.rafts[i] {
  50. let (term, is_leader) = raft.get_state();
  51. if is_leader {
  52. leaders
  53. .entry(term.0)
  54. .or_insert_with(Vec::new)
  55. .push(i)
  56. }
  57. }
  58. }
  59. }
  60. let mut last_term_with_leader = 0;
  61. let mut last_leader = 0;
  62. for (term, leaders) in leaders {
  63. if leaders.len() > 1 {
  64. bail!("term {} has {} (>1) leaders", term, leaders.len());
  65. }
  66. if term > last_term_with_leader {
  67. last_term_with_leader = term;
  68. last_leader = leaders[0];
  69. }
  70. }
  71. if last_term_with_leader != 0 {
  72. return Ok(last_leader);
  73. }
  74. }
  75. Err(anyhow!("expected one leader, got none"))
  76. }
  77. pub fn check_no_leader(&self) -> Result<()> {
  78. let state = self.state.lock();
  79. for i in 0..self.server_count {
  80. if state.connected[i] {
  81. if let Some(raft) = &state.rafts[i] {
  82. if raft.get_state().1 {
  83. bail!(
  84. "expected no leader, but {} claims to be leader",
  85. i
  86. );
  87. }
  88. }
  89. }
  90. }
  91. Ok(())
  92. }
  93. pub fn check_terms(&self) -> Result<Option<usize>> {
  94. let mut term = None;
  95. let state = self.state.lock();
  96. for i in 0..self.server_count {
  97. if state.connected[i] {
  98. if let Some(raft) = &state.rafts[i] {
  99. let raft_term = raft.get_state().0;
  100. if let Some(term) = term {
  101. if term != raft_term {
  102. bail!("Servers disagree on term")
  103. }
  104. } else {
  105. term.replace(raft_term);
  106. }
  107. }
  108. }
  109. }
  110. // Unwrap type Term into usize.
  111. Ok(term.map(|term| term.0))
  112. }
  113. /// Returns the number of peers that committed at least `index` commands,
  114. /// as well as the command at the index.
  115. pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
  116. let mut count = 0;
  117. let mut cmd = Self::INVALID_COMMAND;
  118. for i in 0..self.server_count {
  119. let log = self.log.lock();
  120. if let Err(e) = &log.results[i] {
  121. bail!(e.to_string())
  122. }
  123. if log.committed_logs[i].len() > index {
  124. let command = log.committed_logs[i][index];
  125. if count > 0 && command != cmd {
  126. bail!(
  127. "committed values do not match: index {}, {}, {}",
  128. index,
  129. cmd,
  130. command
  131. )
  132. }
  133. count += 1;
  134. cmd = command;
  135. }
  136. }
  137. Ok((count, cmd))
  138. }
  139. pub fn wait(
  140. &self,
  141. index: usize,
  142. min_count: usize,
  143. at_term: Option<usize>,
  144. ) -> Result<Option<i32>> {
  145. let mut sleep_time_mills = 10;
  146. for _ in 0..30 {
  147. let (count, _) = self.committed_count(index)?;
  148. if count >= min_count {
  149. break;
  150. }
  151. sleep_millis(sleep_time_mills);
  152. if sleep_time_mills < 1000 {
  153. sleep_time_mills <<= 1;
  154. }
  155. if let Some(at_term) = at_term {
  156. let state = self.state.lock();
  157. for raft in state.rafts.iter().flatten() {
  158. let (Term(term), _) = raft.get_state();
  159. if term > at_term {
  160. return Ok(None);
  161. }
  162. }
  163. }
  164. }
  165. let (count, cmd) = self.committed_count(index)?;
  166. if count < min_count {
  167. bail!(
  168. "only {} decided for index {}; wanted {}",
  169. count,
  170. index,
  171. min_count
  172. )
  173. }
  174. Ok(Some(cmd))
  175. }
  176. pub fn one(
  177. &self,
  178. cmd: i32,
  179. expected_servers: usize,
  180. retry: bool,
  181. ) -> Result<usize> {
  182. let start = Instant::now();
  183. let mut cnt = 0;
  184. while start.elapsed() < Duration::from_secs(10) {
  185. let mut first_index = None;
  186. for _ in 0..self.server_count {
  187. cnt += 1;
  188. cnt %= self.server_count;
  189. let state = self.state.lock();
  190. if state.connected[cnt] {
  191. if let Some(raft) = &state.rafts[cnt] {
  192. if let Some(index_term) = raft.start(cmd) {
  193. first_index.replace(index_term.index);
  194. }
  195. }
  196. }
  197. }
  198. if let Some(index) = first_index {
  199. let agreement_start = Instant::now();
  200. while agreement_start.elapsed() < Duration::from_secs(2) {
  201. let (commit_count, committed_command) =
  202. self.committed_count(index)?;
  203. if commit_count > 0
  204. && commit_count >= expected_servers
  205. && committed_command == cmd
  206. {
  207. return Ok(index);
  208. }
  209. sleep_millis(20);
  210. }
  211. if !retry {
  212. break;
  213. }
  214. } else {
  215. sleep_millis(50);
  216. }
  217. }
  218. Err(anyhow!("one({}) failed to reach agreement", cmd))
  219. }
  220. pub fn connect(&self, index: usize) {
  221. self.set_connect(index, true);
  222. }
  223. pub fn disconnect(&self, index: usize) {
  224. self.set_connect(index, false);
  225. }
  226. pub fn set_connect(&self, index: usize, yes: bool) {
  227. let mut state = self.state.lock();
  228. state.connected[index] = yes;
  229. let mut network = self.network.lock();
  230. // Outgoing clients.
  231. for j in 0..self.server_count {
  232. if state.connected[j] {
  233. network.set_enable_client(Self::client_name(index, j), yes)
  234. }
  235. }
  236. // Incoming clients.
  237. for j in 0..self.server_count {
  238. if state.connected[j] {
  239. network.set_enable_client(Self::client_name(j, index), yes);
  240. }
  241. }
  242. }
  243. pub fn connect_pair(&self, one: usize, two: usize) {
  244. let mut state = self.state.lock();
  245. state.connected[one] = true;
  246. state.connected[two] = true;
  247. let mut network = self.network.lock();
  248. network.set_enable_client(Self::client_name(one, two), true);
  249. network.set_enable_client(Self::client_name(two, one), true);
  250. }
  251. pub fn disconnect_pair(&self, one: usize, two: usize) {
  252. let mut network = self.network.lock();
  253. network.set_enable_client(Self::client_name(one, two), false);
  254. network.set_enable_client(Self::client_name(two, one), false);
  255. }
  256. pub fn crash1(&self, index: usize) {
  257. self.disconnect(index);
  258. self.network.lock().remove_server(Self::server_name(index));
  259. let raft = self.state.lock().rafts[index].take();
  260. // There is a potential race condition here. It can be produced by
  261. // 1. Leader sends an AppendEntries request to follower.
  262. // 2. Follower received the request but have not processed it.
  263. // 3. We removed follower from the network and took a snapshot of the
  264. // follower's state.
  265. // 4. Follower appended entries, replied to the leader. Note although
  266. // the follower is removed from the network, it can still send replies.
  267. // 5. The leader believes the entries are appended, but they are not.
  268. // Make sure to give up the log lock before calling external code, which
  269. // might directly or indirectly block on the log lock, e.g. through
  270. // the apply command function.
  271. let Some(raft) = raft else { return };
  272. let data = self.log.lock().saved[index].save();
  273. raft.kill().join();
  274. let mut log = self.log.lock();
  275. let storage = InMemoryStorage::create(usize::MAX);
  276. storage.restore(data);
  277. log.saved[index] = storage;
  278. }
  279. pub fn start1(&self, index: usize) -> Result<()> {
  280. if self.state.lock().rafts[index].is_some() {
  281. self.crash1(index);
  282. }
  283. let mut clients = vec![];
  284. {
  285. let mut network = self.network.lock();
  286. for j in 0..self.server_count {
  287. clients.push(crate::RpcClient::new(network.make_client(
  288. Self::client_name(index, j),
  289. Self::server_name(j),
  290. )))
  291. }
  292. }
  293. let storage = self.log.lock().saved[index].clone();
  294. let log = self.log.clone();
  295. let raft = Raft::new(
  296. clients,
  297. index,
  298. storage,
  299. move |message| Self::apply_command(log.clone(), index, message),
  300. NO_SNAPSHOT,
  301. );
  302. self.state.lock().rafts[index].replace(raft.clone());
  303. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  304. Ok(())
  305. }
  306. /// Start a new command, returns (term, index).
  307. pub fn leader_start(
  308. &self,
  309. leader: usize,
  310. cmd: i32,
  311. ) -> Option<(usize, usize)> {
  312. self.state.lock().rafts[leader]
  313. .as_ref()
  314. .unwrap()
  315. .start(cmd)
  316. .map(|index_term| (index_term.term.0, index_term.index))
  317. }
  318. pub fn is_connected(&self, index: usize) -> bool {
  319. self.state.lock().connected[index]
  320. }
  321. pub fn is_server_alive(&self, index: usize) -> bool {
  322. self.state.lock().rafts[index].is_some()
  323. }
  324. pub fn total_rpcs(&self) -> usize {
  325. self.network.lock().get_total_rpc_count()
  326. }
  327. pub fn set_unreliable(&self, yes: bool) {
  328. self.network.lock().set_reliable(!yes);
  329. }
  330. pub fn set_long_reordering(&self, yes: bool) {
  331. self.network.lock().set_long_reordering(yes);
  332. }
  333. pub fn end(&self) {}
  334. pub fn cleanup(&self) {
  335. log::trace!("Cleaning up test raft.config ...");
  336. let mut network = self.network.lock();
  337. for i in 0..self.server_count {
  338. network.remove_server(Self::server_name(i));
  339. }
  340. network.stop();
  341. drop(network);
  342. for raft in &mut self.state.lock().rafts {
  343. if let Some(raft) = raft.take() {
  344. raft.kill().join();
  345. }
  346. }
  347. log::trace!("Cleaning up test raft.config done.");
  348. eprintln!(
  349. "Ruaft log file for {}: {:?}",
  350. self.test_path,
  351. self.log_file_path.as_os_str()
  352. );
  353. }
  354. }
  355. impl Config {
  356. const INVALID_COMMAND: i32 = -1;
  357. fn apply_command(
  358. log_state: Arc<Mutex<LogState>>,
  359. server_index: usize,
  360. message: ApplyCommandMessage<i32>,
  361. ) {
  362. let (index, command) =
  363. if let ApplyCommandMessage::Command(index, command) = message {
  364. (index, command.unwrap_or(0))
  365. } else {
  366. // Ignore snapshots.
  367. return;
  368. };
  369. let mut log_state = log_state.lock();
  370. let committed_logs = &mut log_state.committed_logs;
  371. let mut err = None;
  372. for (one_index, one_server) in committed_logs.iter().enumerate() {
  373. if one_server.len() > index && one_server[index] != command {
  374. err = Some((
  375. one_index,
  376. Err(anyhow!(
  377. "commit index={} server={} {} != server={} {}",
  378. index,
  379. server_index,
  380. command,
  381. one_index,
  382. one_server[index],
  383. )),
  384. ));
  385. break;
  386. }
  387. }
  388. let one_server = &mut committed_logs[server_index];
  389. if one_server.len() <= index {
  390. one_server.resize(index + 1, Self::INVALID_COMMAND);
  391. }
  392. one_server[index] = command;
  393. if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
  394. log_state.results[server_index] = Err(anyhow!(
  395. "server {} apply out of order {}",
  396. server_index,
  397. index
  398. ));
  399. } else if let Some((one_index, err)) = err {
  400. log_state.results[one_index] = err
  401. }
  402. if index > log_state.max_index {
  403. log_state.max_index = index;
  404. }
  405. }
  406. }
  407. #[macro_export]
  408. macro_rules! make_config {
  409. ($server_count:expr, $unreliable:expr) => {
  410. $crate::raft::config::make_config(
  411. $server_count,
  412. $unreliable,
  413. stdext::function_name!(),
  414. )
  415. };
  416. }
  417. pub fn make_config(
  418. server_count: usize,
  419. unreliable: bool,
  420. test_path: &'static str,
  421. ) -> Config {
  422. // Create a logger first.
  423. let log_file_path = test_utils::init_log(test_path)
  424. .expect("Test log file creation should never fail");
  425. let network = labrpc::Network::run_daemon();
  426. {
  427. let mut unlocked_network = network.lock();
  428. unlocked_network.set_reliable(!unreliable);
  429. unlocked_network.set_long_delays(true);
  430. }
  431. let state = Mutex::new(ConfigState {
  432. rafts: vec![None; server_count],
  433. connected: vec![true; server_count],
  434. });
  435. let mut saved = vec![];
  436. saved.resize_with(server_count, || InMemoryStorage::create(usize::MAX));
  437. let log = Arc::new(Mutex::new(LogState {
  438. committed_logs: vec![vec![]; server_count],
  439. results: vec![],
  440. max_index: 0,
  441. saved,
  442. }));
  443. log.lock().results.resize_with(server_count, || Ok(()));
  444. let cfg = Config {
  445. network,
  446. server_count,
  447. state,
  448. log,
  449. log_file_path,
  450. test_path,
  451. };
  452. for i in 0..server_count {
  453. cfg.start1(i).expect("Starting server should not fail");
  454. }
  455. cfg
  456. }