server.rs 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. use super::common::UniqueId;
  2. use parking_lot::Mutex;
  3. use ruaft::{Persister, Raft, RpcClient};
  4. use std::collections::HashMap;
  5. use std::sync::atomic::AtomicBool;
  6. use std::sync::mpsc::{channel, Receiver};
  7. use std::sync::Arc;
  8. struct KVServer {
  9. state: Mutex<KVServerState>,
  10. rf: Raft<KVOp>,
  11. command_channel: Receiver<(usize, KVOp)>,
  12. shutdown: AtomicBool,
  13. // snapshot
  14. }
  15. #[derive(Clone, Default, Serialize, Deserialize)]
  16. struct KVOp {
  17. unique_id: UniqueId,
  18. }
  19. #[derive(Default)]
  20. struct KVServerState {
  21. kv: HashMap<String, String>,
  22. debug_kv: HashMap<String, String>,
  23. applied_op: HashMap<UniqueId, KVOp>,
  24. }
  25. impl KVServer {
  26. pub fn new(
  27. servers: Vec<RpcClient>,
  28. me: usize,
  29. persister: Arc<dyn Persister>,
  30. ) -> Self {
  31. let (tx, rx) = channel();
  32. let apply_command = move |index, command| {
  33. tx.send((index, command))
  34. .expect("The receiving end of apply command channel should have not been dropped");
  35. };
  36. Self {
  37. state: Default::default(),
  38. rf: Raft::new(
  39. servers,
  40. me,
  41. persister,
  42. apply_command,
  43. None,
  44. Raft::<KVOp>::NO_SNAPSHOT,
  45. ),
  46. command_channel: rx,
  47. shutdown: AtomicBool::new(false),
  48. }
  49. }
  50. }