Эх сурвалжийг харах

Some polishing and minor improvements.

Jing Yang 5 жил өмнө
parent
commit
a305bbcd97

+ 6 - 2
src/install_snapshot.rs

@@ -9,7 +9,7 @@ pub(crate) struct InstallSnapshotArgs {
     leader_id: Peer,
     pub(crate) last_included_index: Index,
     last_included_term: Term,
-    // TODO(ditsing): this seems less efficient.
+    // TODO(ditsing): Serde cannot handle Vec<u8> as efficient as expected.
     data: Vec<u8>,
     offset: usize,
     done: bool,
@@ -48,6 +48,7 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         self.election.reset_election_timer();
 
         // The above code is exactly the same as AppendEntries.
+
         if args.last_included_index < rf.log.end()
             && args.last_included_index >= rf.log.start()
             && args.last_included_term == rf.log[args.last_included_index].term
@@ -65,7 +66,10 @@ impl<C: Clone + Default + serde::Serialize> Raft<C> {
         if rf.commit_index > last_log_index {
             rf.commit_index = last_log_index;
         }
-        self.persister.save_state(bytes::Bytes::new()); // TODO(ditsing)
+        self.persister.save_snapshot_and_state(
+            rf.persisted_state().into(),
+            rf.log.snapshot().1,
+        );
 
         self.apply_command_signal.notify_one();
         InstallSnapshotReply { term: args.term }

+ 14 - 8
src/snapshot.rs

@@ -57,21 +57,27 @@ impl<C: 'static + Clone + Default + Send + serde::Serialize> Raft<C> {
                 let snapshot = request_snapshot(log_start.index + 1);
 
                 let mut rf = rf.lock();
-                if rf.current_term != term
-                    || rf.log.first_index_term() != log_start
-                {
-                    // Term has changed, or another snapshot was installed.
+                if rf.log.first_index_term() != log_start {
+                    // Another snapshot was installed, let's try again.
                     unparker.unpark();
                     continue;
                 }
-                if snapshot.last_included_index <= rf.log.start()
-                    || snapshot.last_included_index >= rf.log.end()
-                {
-                    // TODO(ditsing): Something happened.
+                if snapshot.last_included_index <= rf.log.start() {
+                    // It seems the request_snapshot callback is misbehaving,
+                    // let's try again.
                     unparker.unpark();
                     continue;
                 }
 
+                if snapshot.last_included_index >= rf.log.end() {
+                    // We recently rolled back some of the committed logs. This
+                    // can happen but usually the same exact log entries will be
+                    // installed in the next AppendEntries request.
+                    // There is no need to retry, because when the log entries
+                    // are re-committed, we will be notified again.
+                    continue;
+                }
+
                 rf.log.shift(snapshot.last_included_index, snapshot.data);
                 persister.save_snapshot_and_state(
                     rf.persisted_state().into(),