mod.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. extern crate labrpc;
  2. use parking_lot::Mutex;
  3. use ruaft::rpcs::register_server;
  4. use ruaft::{Raft, RpcClient};
  5. use std::sync::Arc;
  6. struct ConfigState {
  7. rafts: Vec<Option<Raft>>,
  8. }
  9. pub struct Config {
  10. network: Arc<std::sync::Mutex<labrpc::Network>>,
  11. server_count: usize,
  12. state: Mutex<ConfigState>,
  13. }
  14. impl Config {
  15. fn server_name(i: usize) -> String {
  16. format!("ruaft-server-{}", i)
  17. }
  18. fn client_name(client: usize, server: usize) -> String {
  19. format!("ruaft-server-{}-to-{}", client, server)
  20. }
  21. pub fn begin<S: std::fmt::Display>(&self, msg: S) {
  22. eprintln!("{}", msg);
  23. }
  24. pub fn check_one_leader(&self) -> std::io::Result<()> {
  25. Ok(())
  26. }
  27. pub fn check_terms(&self) -> std::io::Result<()> {
  28. Ok(())
  29. }
  30. pub fn disconnect(&self, index: usize) {
  31. let mut network = unlock(&self.network);
  32. network.remove_server(&Self::server_name(index));
  33. // Outgoing clients.
  34. for j in 0..self.server_count {
  35. network.set_enable_client(Self::client_name(index, j), false)
  36. }
  37. // Incoming clients.
  38. for j in 0..self.server_count {
  39. network.set_enable_client(Self::client_name(j, index), false);
  40. }
  41. }
  42. pub fn crash1(&mut self, index: usize) {
  43. self.disconnect(index);
  44. unlock(self.network.as_ref()).remove_server(Self::server_name(index));
  45. let raft = {
  46. let mut state = self.state.lock();
  47. state.rafts[index].take()
  48. };
  49. if let Some(raft) = raft {
  50. raft.kill();
  51. }
  52. }
  53. pub fn start1(&mut self, index: usize) -> std::io::Result<()> {
  54. if self.state.lock().rafts[index].is_some() {
  55. self.crash1(index);
  56. }
  57. let mut clients = vec![];
  58. {
  59. let mut network = unlock(&self.network);
  60. for j in 0..self.server_count {
  61. clients.push(RpcClient::new(network.make_client(
  62. Self::client_name(index, j),
  63. Self::server_name(j),
  64. )))
  65. }
  66. }
  67. let raft = Raft::new(clients, index, |_, _| {});
  68. self.state.lock().rafts[index].replace(raft.clone());
  69. let raft = Arc::new(raft);
  70. register_server(raft, Self::server_name(index), self.network.as_ref())
  71. }
  72. pub fn end(&self) {}
  73. pub fn cleanup(&self) {
  74. for raft in &mut self.state.lock().rafts {
  75. if let Some(raft) = raft.take() {
  76. raft.kill();
  77. }
  78. }
  79. }
  80. }
  81. fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
  82. locked.lock().expect("Unlocking network should not fail")
  83. }
  84. pub fn make_config(server_count: usize, unreliable: bool) -> Config {
  85. let network = labrpc::Network::run_daemon();
  86. {
  87. let mut unlocked_network = unlock(&network);
  88. unlocked_network.set_reliable(!unreliable);
  89. unlocked_network.set_long_delays(true);
  90. }
  91. let state = Mutex::new(ConfigState {
  92. rafts: vec![None; server_count],
  93. });
  94. let mut cfg = Config {
  95. network,
  96. server_count,
  97. state,
  98. };
  99. for i in 0..server_count {
  100. cfg.start1(i).expect("Starting server should not fail");
  101. }
  102. cfg
  103. }
  104. pub fn sleep_millis(mills: u64) {
  105. std::thread::sleep(std::time::Duration::from_millis(mills))
  106. }
  107. const LONG_ELECTION_TIMEOUT_MILLIS: u64 = 1000;
  108. pub fn sleep_election_timeouts(count: u64) {
  109. sleep_millis(LONG_ELECTION_TIMEOUT_MILLIS * count)
  110. }