Kaynağa Gözat

Drop the Arc requirement of Persister.

Originally Arc was required because of tests. The persister implemented in
tests are shared between the Raft instance and the test, as they both need to
access stored data. However in real world, persisters are wrappers to disk
utilities, and are rarely shared. This change makes Ruaft work better in the
real world.

The requirement of tests are addressed by exposing the internal Raft persister
to tests only, as implemented in src/utils/integration_test.rs.

A forced casting has to be done in the test persister implemention.
Jing Yang 3 yıl önce
ebeveyn
işleme
26d05814ba

+ 1 - 1
durio/src/run.rs

@@ -19,7 +19,7 @@ pub(crate) async fn run_kv_instance(
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
     }
 
-    let persister = Arc::new(DoNothingPersister::default());
+    let persister = DoNothingPersister::default();
 
     let kv_server = KVServer::new(remote_rafts, me, persister, None);
     let raft = kv_server.raft().clone();

+ 1 - 1
kvraft/src/server.rs

@@ -111,7 +111,7 @@ impl KVServer {
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         max_state_size_bytes: Option<usize>,
     ) -> Arc<Self> {
         let (tx, rx) = channel();

+ 3 - 2
src/raft.rs

@@ -60,7 +60,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         apply_command: impl ApplyCommandFnMut<Command>,
         max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
@@ -95,6 +95,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
 
+        let persister = Arc::new(persister);
         let term_marker = TermMarker::create(
             inner_state.clone(),
             election.clone(),
@@ -308,7 +309,7 @@ mod tests {
         let raft = Raft::new(
             vec![DoNothingRemoteRaft {}; peer_size],
             me,
-            Arc::new(DoNothingPersister {}),
+            DoNothingPersister {},
             |_: ApplyCommandMessage<i32>| {},
             None,
             |_| {},

+ 9 - 2
src/utils/integration_test.rs

@@ -1,9 +1,10 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
-    RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
+    RequestVoteArgs, RequestVoteReply, Term,
 };
+use std::sync::Arc;
 
 pub fn make_request_vote_args(
     term: Term,
@@ -50,3 +51,9 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }
+
+impl<Command> Raft<Command> {
+    pub fn persister(&self) -> Arc<dyn Persister> {
+        self.persister.clone()
+    }
+}

+ 3 - 7
test_configs/src/interceptor/mod.rs

@@ -263,7 +263,6 @@ impl Config {
 
 pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
     let (event_queue, clients) = make_grid_clients(server_count);
-    let persister = Arc::new(Persister::new());
     let mut kv_servers = vec![];
     let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
         .into_iter()
@@ -277,12 +276,9 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
-        let kv_server = KVServer::new(
-            client_vec.to_vec(),
-            index,
-            persister.clone(),
-            max_state,
-        );
+        let persister = Persister::new();
+        let kv_server =
+            KVServer::new(client_vec.to_vec(), index, persister, max_state);
         kv_servers.push(kv_server);
     }
 

+ 18 - 11
test_configs/src/kvraft/config.rs

@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Arc<Persister>>>,
+    storage: Mutex<Vec<Option<Persister>>>,
     maxraftstate: usize,
 }
 
@@ -52,7 +52,9 @@ impl Config {
             }
         }
 
-        let persister = self.storage.lock()[index].clone();
+        let persister = self.storage.lock()[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let kv =
             KVServer::new(clients, index, persister, Some(self.maxraftstate));
@@ -150,14 +152,14 @@ impl Config {
             network.remove_server(Self::kv_server_name(index));
         }
 
-        let data = self.storage.lock()[index].read();
-
-        let persister = Arc::new(Persister::new());
-        self.storage.lock()[index] = persister.clone();
-        persister.restore(data);
-
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
+            let persister = kv_server.raft().persister();
+            let data = Persister::downcast_unsafe(persister.as_ref()).read();
             kv_server.kill();
+
+            let persister = Persister::new();
+            persister.restore(data);
+            self.storage.lock()[index] = Some(persister);
         }
     }
 
@@ -254,8 +256,13 @@ impl Config {
         size_fn: impl Fn(&Persister) -> usize,
     ) -> Result<(), String> {
         let mut over_limits = String::new();
-        for (index, p) in self.storage.lock().iter().enumerate() {
-            let size = size_fn(p);
+        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
+            let p = p
+                .as_ref()
+                .expect("KV server must be running to check size")
+                .raft()
+                .persister();
+            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
@@ -299,7 +306,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     storage
         .lock()
-        .resize_with(server_count, || Arc::new(Persister::new()));
+        .resize_with(server_count, || Some(Persister::new()));
 
     let cfg = Config {
         network,

+ 4 - 0
test_configs/src/persister.rs

@@ -59,4 +59,8 @@ impl Persister {
     pub fn snapshot_size(&self) -> usize {
         self.state.lock().snapshot.len()
     }
+
+    pub fn downcast_unsafe(trait_obj: &dyn ruaft::Persister) -> &Self {
+        unsafe { &*(trait_obj as *const dyn ruaft::Persister as *const Self) }
+    }
 }

+ 14 - 9
test_configs/src/raft/config.rs

@@ -22,7 +22,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     max_index: usize,
-    saved: Vec<Arc<crate::Persister>>,
+    saved: Vec<Option<crate::Persister>>,
 }
 
 pub struct Config {
@@ -297,16 +297,19 @@ impl Config {
         // 4. Follower appended entries, replied to the leader. Note although
         // the follower is removed from the network, it can still send replies.
         // 5. The leader believes the entries are appended, but they are not.
-        let data = self.log.lock().saved[index].read_state();
+
         // Make sure to give up the log lock before calling external code, which
         // might directly or indirectly block on the log lock, e.g. through
         // the apply command function.
-        if let Some(raft) = raft {
-            raft.kill().join();
-        }
+        let Some(raft) = raft else { return };
+
+        let data = raft.persister().read_state();
+        raft.kill().join();
+
         let mut log = self.log.lock();
-        log.saved[index] = Arc::new(crate::Persister::new());
-        log.saved[index].save_state(data);
+        let persister = crate::Persister::new();
+        persister.save_state(data);
+        log.saved[index] = Some(persister);
     }
 
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -324,7 +327,9 @@ impl Config {
                 )))
             }
         }
-        let persister = self.log.lock().saved[index].clone();
+        let persister = self.log.lock().saved[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let log = self.log.clone();
         let raft = Raft::new(
@@ -488,7 +493,7 @@ pub fn make_config(
     });
 
     let mut saved = vec![];
-    saved.resize_with(server_count, || Arc::new(crate::Persister::new()));
+    saved.resize_with(server_count, || Some(crate::Persister::new()));
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         results: vec![],