2 次代碼提交 d74fadfbce ... bd32531d58

作者 SHA1 備註 提交日期
  Jing Yang bd32531d58 Optimize serialization of Vec<u8> and boost throughput to 150%. 3 年之前
  Jing Yang 26d05814ba Drop the Arc requirement of Persister. 3 年之前

+ 1 - 0
Cargo.toml

@@ -25,6 +25,7 @@ parking_lot = "0.12"
 rand = "0.8"
 serde = "1.0"
 serde_derive = "1.0"
+serde_bytes = "0.11.9"
 tokio = { version = "1.7", features = ["net", "rt-multi-thread", "sync", "time", "parking_lot"] }
 test_utils = { path = "test_utils", optional = true }
 

+ 1 - 1
durio/src/run.rs

@@ -19,7 +19,7 @@ pub(crate) async fn run_kv_instance(
         remote_rafts.push(LazyRaftServiceClient::new(raft_peer));
     }
 
-    let persister = Arc::new(DoNothingPersister::default());
+    let persister = DoNothingPersister::default();
 
     let kv_server = KVServer::new(remote_rafts, me, persister, None);
     let raft = kv_server.raft().clone();

+ 1 - 1
kvraft/src/server.rs

@@ -111,7 +111,7 @@ impl KVServer {
     pub fn new(
         servers: Vec<impl RemoteRaft<UniqueKVOp> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         max_state_size_bytes: Option<usize>,
     ) -> Arc<Self> {
         let (tx, rx) = channel();

+ 1 - 0
src/log_array.rs

@@ -46,6 +46,7 @@ pub struct LogEntry<Command> {
 #[derive(Clone, Serialize, Deserialize)]
 pub(crate) struct LogArray<C> {
     inner: Vec<LogEntry<C>>,
+    #[serde(with = "serde_bytes")]
     snapshot: Vec<u8>,
 }
 

+ 1 - 1
src/messages.rs

@@ -42,7 +42,7 @@ pub struct InstallSnapshotArgs {
     pub(crate) leader_id: Peer,
     pub(crate) last_included_index: Index,
     pub(crate) last_included_term: Term,
-    // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
+    #[serde(with = "serde_bytes")]
     pub(crate) data: Vec<u8>,
     pub(crate) offset: usize,
     pub(crate) done: bool,

+ 3 - 2
src/raft.rs

@@ -60,7 +60,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     pub fn new(
         peers: Vec<impl RemoteRaft<Command> + 'static>,
         me: usize,
-        persister: Arc<dyn Persister>,
+        persister: impl Persister + 'static,
         apply_command: impl ApplyCommandFnMut<Command>,
         max_state_size_bytes: Option<usize>,
         request_snapshot: impl RequestSnapshotFnMut,
@@ -95,6 +95,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let election = Arc::new(ElectionState::create());
         election.reset_election_timer();
 
+        let persister = Arc::new(persister);
         let term_marker = TermMarker::create(
             inner_state.clone(),
             election.clone(),
@@ -308,7 +309,7 @@ mod tests {
         let raft = Raft::new(
             vec![DoNothingRemoteRaft {}; peer_size],
             me,
-            Arc::new(DoNothingPersister {}),
+            DoNothingPersister {},
             |_: ApplyCommandMessage<i32>| {},
             None,
             |_| {},

+ 9 - 2
src/utils/integration_test.rs

@@ -1,9 +1,10 @@
 #![cfg(feature = "integration-test")]
 
 use crate::{
-    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, RequestVoteArgs,
-    RequestVoteReply, Term,
+    AppendEntriesArgs, AppendEntriesReply, IndexTerm, Peer, Persister, Raft,
+    RequestVoteArgs, RequestVoteReply, Term,
 };
+use std::sync::Arc;
 
 pub fn make_request_vote_args(
     term: Term,
@@ -50,3 +51,9 @@ pub fn unpack_append_entries_args<T>(
 pub fn unpack_append_entries_reply(reply: AppendEntriesReply) -> (Term, bool) {
     (reply.term, reply.success)
 }
+
+impl<Command> Raft<Command> {
+    pub fn persister(&self) -> Arc<dyn Persister> {
+        self.persister.clone()
+    }
+}

+ 3 - 7
test_configs/src/interceptor/mod.rs

@@ -263,7 +263,6 @@ impl Config {
 
 pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
     let (event_queue, clients) = make_grid_clients(server_count);
-    let persister = Arc::new(Persister::new());
     let mut kv_servers = vec![];
     let clients: Vec<Vec<&'static InterceptingRpcClient<UniqueKVOp>>> = clients
         .into_iter()
@@ -277,12 +276,9 @@ pub fn make_config(server_count: usize, max_state: Option<usize>) -> Config {
         })
         .collect();
     for (index, client_vec) in clients.iter().enumerate() {
-        let kv_server = KVServer::new(
-            client_vec.to_vec(),
-            index,
-            persister.clone(),
-            max_state,
-        );
+        let persister = Persister::new();
+        let kv_server =
+            KVServer::new(client_vec.to_vec(), index, persister, max_state);
         kv_servers.push(kv_server);
     }
 

+ 18 - 11
test_configs/src/kvraft/config.rs

@@ -19,7 +19,7 @@ pub struct Config {
     network: Arc<Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
-    storage: Mutex<Vec<Arc<Persister>>>,
+    storage: Mutex<Vec<Option<Persister>>>,
     maxraftstate: usize,
 }
 
@@ -52,7 +52,9 @@ impl Config {
             }
         }
 
-        let persister = self.storage.lock()[index].clone();
+        let persister = self.storage.lock()[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let kv =
             KVServer::new(clients, index, persister, Some(self.maxraftstate));
@@ -150,14 +152,14 @@ impl Config {
             network.remove_server(Self::kv_server_name(index));
         }
 
-        let data = self.storage.lock()[index].read();
-
-        let persister = Arc::new(Persister::new());
-        self.storage.lock()[index] = persister.clone();
-        persister.restore(data);
-
         if let Some(kv_server) = self.state.lock().kv_servers[index].take() {
+            let persister = kv_server.raft().persister();
+            let data = Persister::downcast_unsafe(persister.as_ref()).read();
             kv_server.kill();
+
+            let persister = Persister::new();
+            persister.restore(data);
+            self.storage.lock()[index] = Some(persister);
         }
     }
 
@@ -254,8 +256,13 @@ impl Config {
         size_fn: impl Fn(&Persister) -> usize,
     ) -> Result<(), String> {
         let mut over_limits = String::new();
-        for (index, p) in self.storage.lock().iter().enumerate() {
-            let size = size_fn(p);
+        for (index, p) in self.state.lock().kv_servers.iter().enumerate() {
+            let p = p
+                .as_ref()
+                .expect("KV server must be running to check size")
+                .raft()
+                .persister();
+            let size = size_fn(Persister::downcast_unsafe(p.as_ref()));
             if size > upper {
                 let str = format!(" (index {}, size {})", index, size);
                 over_limits.push_str(&str);
@@ -299,7 +306,7 @@ pub fn make_config(
     let storage = Mutex::new(vec![]);
     storage
         .lock()
-        .resize_with(server_count, || Arc::new(Persister::new()));
+        .resize_with(server_count, || Some(Persister::new()));
 
     let cfg = Config {
         network,

+ 4 - 0
test_configs/src/persister.rs

@@ -59,4 +59,8 @@ impl Persister {
     pub fn snapshot_size(&self) -> usize {
         self.state.lock().snapshot.len()
     }
+
+    pub fn downcast_unsafe(trait_obj: &dyn ruaft::Persister) -> &Self {
+        unsafe { &*(trait_obj as *const dyn ruaft::Persister as *const Self) }
+    }
 }

+ 14 - 9
test_configs/src/raft/config.rs

@@ -22,7 +22,7 @@ struct LogState {
     committed_logs: Vec<Vec<i32>>,
     results: Vec<Result<()>>,
     max_index: usize,
-    saved: Vec<Arc<crate::Persister>>,
+    saved: Vec<Option<crate::Persister>>,
 }
 
 pub struct Config {
@@ -297,16 +297,19 @@ impl Config {
         // 4. Follower appended entries, replied to the leader. Note although
         // the follower is removed from the network, it can still send replies.
         // 5. The leader believes the entries are appended, but they are not.
-        let data = self.log.lock().saved[index].read_state();
+
         // Make sure to give up the log lock before calling external code, which
         // might directly or indirectly block on the log lock, e.g. through
         // the apply command function.
-        if let Some(raft) = raft {
-            raft.kill().join();
-        }
+        let Some(raft) = raft else { return };
+
+        let data = raft.persister().read_state();
+        raft.kill().join();
+
         let mut log = self.log.lock();
-        log.saved[index] = Arc::new(crate::Persister::new());
-        log.saved[index].save_state(data);
+        let persister = crate::Persister::new();
+        persister.save_state(data);
+        log.saved[index] = Some(persister);
     }
 
     pub fn start1(&self, index: usize) -> Result<()> {
@@ -324,7 +327,9 @@ impl Config {
                 )))
             }
         }
-        let persister = self.log.lock().saved[index].clone();
+        let persister = self.log.lock().saved[index]
+            .take()
+            .expect("A persister must be present to create a raft server");
 
         let log = self.log.clone();
         let raft = Raft::new(
@@ -488,7 +493,7 @@ pub fn make_config(
     });
 
     let mut saved = vec![];
-    saved.resize_with(server_count, || Arc::new(crate::Persister::new()));
+    saved.resize_with(server_count, || Some(crate::Persister::new()));
     let log = Arc::new(Mutex::new(LogState {
         committed_logs: vec![vec![]; server_count],
         results: vec![],

+ 13 - 0
tests/snapshot_tests.rs

@@ -167,3 +167,16 @@ fn linearizability() {
         test_linearizability: true,
     });
 }
+
+#[ignore = "Large test with too many threads"]
+#[test]
+fn snapshot_throughput() {
+    init_test_log!();
+    generic_test(GenericTestParams {
+        // To boost client count to 48 we need more network threads in labrpc.
+        clients: 32,
+        crash: true,
+        maxraftstate: Some(10000),
+        ..Default::default()
+    })
+}