config.rs 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. pub use anyhow::Result;
  2. use client::Clerk;
  3. use parking_lot::Mutex;
  4. use ruaft::rpcs::register_server;
  5. use ruaft::RpcClient;
  6. use server::KVServer;
  7. use std::sync::Arc;
  8. use testing_utils::memory_persister::MemoryStorage;
  9. use testing_utils::rpcs::register_kv_server;
  10. struct ConfigState {
  11. kv_servers: Vec<Option<Arc<KVServer>>>,
  12. clerks: Vec<Option<Clerk>>,
  13. }
  14. pub struct Config {
  15. network: Arc<Mutex<labrpc::Network>>,
  16. server_count: usize,
  17. state: Mutex<ConfigState>,
  18. storage: MemoryStorage,
  19. maxraftstate: usize,
  20. }
  21. impl Config {
  22. fn kv_server_name(i: usize) -> String {
  23. format!("kv-server-{}", i)
  24. }
  25. fn server_name(i: usize) -> String {
  26. format!("kvraft-server-{}", i)
  27. }
  28. fn client_name(client: usize, server: usize) -> String {
  29. format!("kvraft-client-{}-to-{}", client, server)
  30. }
  31. fn start_server(&self, index: usize) -> Result<()> {
  32. let mut clients = vec![];
  33. {
  34. let mut network = self.network.lock();
  35. for j in 0..self.server_count {
  36. clients.push(RpcClient::new(network.make_client(
  37. Self::client_name(index, j),
  38. Self::server_name(j),
  39. )))
  40. }
  41. }
  42. let persister = self.storage.at(index);
  43. let kv = KVServer::new(clients, index, persister);
  44. self.state.lock().kv_servers[index].replace(kv.clone());
  45. let raft = std::rc::Rc::new(kv.raft());
  46. register_server(raft, Self::server_name(index), self.network.as_ref())?;
  47. register_kv_server(
  48. kv,
  49. Self::kv_server_name(index),
  50. self.network.as_ref(),
  51. )?;
  52. Ok(())
  53. }
  54. }
  55. pub fn make_config(
  56. server_count: usize,
  57. unreliable: bool,
  58. maxraftstate: usize,
  59. ) -> Config {
  60. let network = labrpc::Network::run_daemon();
  61. {
  62. let mut unlocked_network = network.lock();
  63. unlocked_network.set_reliable(!unreliable);
  64. unlocked_network.set_long_delays(true);
  65. }
  66. let state = Mutex::new(ConfigState {
  67. kv_servers: vec![None; server_count],
  68. clerks: vec![],
  69. });
  70. let mut storage = MemoryStorage::default();
  71. for _ in 0..server_count {
  72. storage.make();
  73. }
  74. let cfg = Config {
  75. network,
  76. server_count,
  77. state,
  78. storage,
  79. maxraftstate,
  80. };
  81. for i in 0..server_count {
  82. cfg.start_server(i)
  83. .expect("Starting server should not fail");
  84. }
  85. cfg
  86. }