Просмотр исходного кода

Drop the Arc requirement of Persister.

Originally Arc was required because of tests. The persister implemented in
tests are shared between the Raft instance and the test, as they both need to
access stored data. However in real world, persisters are wrappers to disk
utilities, and are rarely shared. This change makes Ruaft work better in the
real world.

The requirement of tests are addressed by exposing the internal Raft persister
to tests only, as implemented in src/utils/integration_test.rs.

A forced casting has to be done in the test persister implemention.
Jing Yang 3 лет назад
Родитель
Сommit
44774c320f

+ 1 - 1
durio/src/run.rs

@@ -19,7 +19,7 @@ pub(crate) async fn run_kv_instance(
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
     }
     }
 
 
-    let persister = Arc::new(DoNothingPersister::default());
+    let persister = DoNothingPersister::default();
 
 
     let kv_server = KVServer::new(remote_rafts, me, persister, None);
     let kv_server = KVServer::new(remote_rafts, me, persister, None);
     let raft = kv_server.raft().clone();
     let raft = kv_server.raft().clone();

+ 1 - 1
kvraft/src/server.rs

@@ -111,7 +111,7 @@ impl KVServer {
     pub fn new(
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         max_state_size_bytes: Option<usize>,
         max_state_size_bytes: Option<usize>,
     ) -> Arc<Self> {
     ) -> Arc<Self> {
         let (tx, rx) = channel();
         let (tx, rx) = channel();

+ 3 - 2
src/raft.rs

@@ -60,7 +60,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         apply_command: impl ApplyCommandFnMut<Command>,
         apply_command: impl ApplyCommandFnMut<Command>,
         max_state_size_bytes: Option<usize>,
         max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
         request_snapshot: impl RequestSnapshotFnMut,
@@ -95,6 +95,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
         election.reset_election_timer();
 
 
+        let persister = Arc::new(persister);
         let term_marker = TermMarker::create(
         let term_marker = TermMarker::create(
             inner_state.clone(),
             inner_state.clone(),
             election.clone(),
             election.clone(),
@@ -308,7 +309,7 @@ mod tests {
         let raft = Raft::new(
         let raft = Raft::new(
             vec![DoNothingRemoteRaft {}; peer_size],
             vec![DoNothingRemoteRaft {}; peer_size],
             me,
             me,
-            Arc::new(DoNothingPersister {}),
+            DoNothingPersister {},
             |_: ApplyCommandMessage<i32>| {},
             |_: ApplyCommandMessage<i32>| {},
             None,
             None,
             |_| {},
             |_| {},

+ 9 - 2
src/utils/integration_test.rs

@@ -1,9 +1,10 @@
 #![cfg(feature = "integration-test")]
 #![cfg(feature = "integration-test")]
 
 
 use crate::{
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
-    RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
+    RequestVoteArgs, RequestVoteReply, Term,
 };
 };
+use std::sync::Arc;
 
 
 pub fn make_request_vote_args(
 pub fn make_request_vote_args(
     term: Term,
     term: Term,
@@ -50,3 +51,9 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
     (reply.term, reply.success)
 }
 }
+
+impl<Command> Raft<Command> {
+    pub fn persister(&self) -> Arc<dyn Persister> {
+        self.persister.clone()
+    }
+}

+ 3 - 7
test_configs/src/interceptor/mod.rs

@@ -263,7 +263,6 @@ impl Config {
 
 
 pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
 pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
     let (event_queue, clients) = make_grid_clients(server_count);
     let (event_queue, clients) = make_grid_clients(server_count);
-    let persister = Arc::new(Persister::new());
     let mut kv_servers = vec![];
     let mut kv_servers = vec![];
     let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
     let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
         .into_iter()
         .into_iter()
@@ -277,12 +276,9 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         })
         .collect();
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
     for (index, client_vec) in clients.iter().enumerate() {
-        let kv_server = KVServer::new(
-            client_vec.to_vec(),
-            index,
-            persister.clone(),
-            max_state,
-        );
+        let persister = Persister::new();
+        let kv_server =
+            KVServer::new(client_vec.to_vec(), index, persister, max_state);
         kv_servers.push(kv_server);
         kv_servers.push(kv_server);
     }
     }
 
 

+ 18 - 11
test_configs/src/kvraft/config.rs

@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     server_count: usize,
     state: Mutex<ConfigState>,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Arc<Persister>>>,
+    storage: Mutex<Vec<Option<Persister>>>,
     maxraftstate: usize,
     maxraftstate: usize,
 }
 }
 
 
@@ -52,7 +52,9 @@ impl Config {
             }
             }
         }
         }
 
 
-        let persister = self.storage.lock()[index].clone();
+        let persister = self.storage.lock()[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
 
         let kv =
         let kv =
             KVServer::new(clients, index, persister, Some(self.maxraftstate));
             KVServer::new(clients, index, persister, Some(self.maxraftstate));
@@ -150,14 +152,14 @@ impl Config {
             network.remove_server(Self::kv_server_name(index));
             network.remove_server(Self::kv_server_name(index));
         }
         }
 
 
-        let data = self.storage.lock()[index].read();
-
-        let persister = Arc::new(Persister::new());
-        self.storage.lock()[index] = persister.clone();
-        persister.restore(data);
-
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
+            let persister = kv_server.raft().persister();
+            let data = Persister::downcast_unsafe(persister.as_ref()).read();
             kv_server.kill();
             kv_server.kill();
+
+            let persister = Persister::new();
+            persister.restore(data);
+            self.storage.lock()[index] = Some(persister);
         }
         }
     }
     }
 
 
@@ -254,8 +256,13 @@ impl Config {
         size_fn: impl Fn(&Persister) -> usize,
         size_fn: impl Fn(&Persister) -> usize,
     ) -> Result<(), String> {
     ) -> Result<(), String> {
         let mut over_limits = String::new();
         let mut over_limits = String::new();
-        for (index, p) in self.storage.lock().iter().enumerate() {
-            let size = size_fn(p);
+        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
+            let p = p
+                .as_ref()
+                .expect("KV server must be running to check size")
+                .raft()
+                .persister();
+            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
             if size > upper {
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
                 over_limits.push_str(&str);
@@ -299,7 +306,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     let storage = Mutex::new(vec![]);
     storage
     storage
         .lock()
         .lock()
-        .resize_with(server_count, || Arc::new(Persister::new()));
+        .resize_with(server_count, || Some(Persister::new()));
 
 
     let cfg = Config {
     let cfg = Config {
         network,
         network,

+ 4 - 0
test_configs/src/persister.rs

@@ -59,4 +59,8 @@ impl Persister {
     pub fn snapshot_size(&self) -> usize {
     pub fn snapshot_size(&self) -> usize {
         self.state.lock().snapshot.len()
         self.state.lock().snapshot.len()
     }
     }
+
+    pub fn downcast_unsafe(trait_obj: &dyn ruaft::Persister) -> &Self {
+        unsafe { &*(trait_obj as *const dyn ruaft::Persister as *const Self) }
+    }
 }
 }

+ 14 - 9
test_configs/src/raft/config.rs

@@ -22,7 +22,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     results: Vec<Result<()>>,
     max_index: usize,
     max_index: usize,
-    saved: Vec<Arc<crate::Persister>>,
+    saved: Vec<Option<crate::Persister>>,
 }
 }
 
 
 pub struct Config {
 pub struct Config {
@@ -297,16 +297,19 @@ impl Config {
         // 4. Follower appended entries, replied to the leader. Note although
         // 4. Follower appended entries, replied to the leader. Note although
         // the follower is removed from the network, it can still send replies.
         // the follower is removed from the network, it can still send replies.
         // 5. The leader believes the entries are appended, but they are not.
         // 5. The leader believes the entries are appended, but they are not.
-        let data = self.log.lock().saved[index].read_state();
+
         // Make sure to give up the log lock before calling external code, which
         // Make sure to give up the log lock before calling external code, which
         // might directly or indirectly block on the log lock, e.g. through
         // might directly or indirectly block on the log lock, e.g. through
         // the apply command function.
         // the apply command function.
-        if let Some(raft) = raft {
-            raft.kill().join();
-        }
+        let Some(raft) = raft else { return };
+
+        let data = raft.persister().read_state();
+        raft.kill().join();
+
         let mut log = self.log.lock();
         let mut log = self.log.lock();
-        log.saved[index] = Arc::new(crate::Persister::new());
-        log.saved[index].save_state(data);
+        let persister = crate::Persister::new();
+        persister.save_state(data);
+        log.saved[index] = Some(persister);
     }
     }
 
 
     pub fn start1(&self, index: usize) -> Result<()> {
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -324,7 +327,9 @@ impl Config {
                 )))
                 )))
             }
             }
         }
         }
-        let persister = self.log.lock().saved[index].clone();
+        let persister = self.log.lock().saved[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
 
         let log = self.log.clone();
         let log = self.log.clone();
         let raft = Raft::new(
         let raft = Raft::new(
@@ -488,7 +493,7 @@ pub fn make_config(
     });
     });
 
 
     let mut saved = vec![];
     let mut saved = vec![];
-    saved.resize_with(server_count, || Arc::new(crate::Persister::new()));
+    saved.resize_with(server_count, || Some(crate::Persister::new()));
     let log = Arc::new(Mutex::new(LogState {
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         committed_logs: vec![vec![]; server_count],
         results: vec![],
         results: vec![],

+ 1 - 3
test_configs/src/rpcs.rs

@@ -216,8 +216,6 @@ pub fn register_kv_server<
 
 
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
-    use std::sync::Arc;
-
     use bytes::Bytes;
     use bytes::Bytes;
 
 
     use ruaft::utils::integration_test::{
     use ruaft::utils::integration_test::{
@@ -258,7 +256,7 @@ mod tests {
             let raft = Raft::new(
             let raft = Raft::new(
                 vec![RpcClient(client)],
                 vec![RpcClient(client)],
                 0,
                 0,
-                Arc::new(DoNothingPersister),
+                DoNothingPersister,
                 |_: ApplyCommandMessage<i32>| {},
                 |_: ApplyCommandMessage<i32>| {},
                 None,
                 None,
                 crate::utils::NO_SNAPSHOT,
                 crate::utils::NO_SNAPSHOT,