Преглед на файлове

Add the figure-8, unreliable test, and a serious bug fix.

Previous version of sync_log_entries might build and send a append
entries request when the server is no longer the leader at the
current term, causing log corruption.

Also fixed the apply command bug: each time the last_applied index
should be moved to the commit index, instead of 1.
Jing Yang преди 5 години
родител
ревизия
d7d18284fa
променени са 3 файла, в които са добавени 99 реда и са изтрити 8 реда
  1. 27 8
      src/lib.rs
  2. 8 0
      tests/config/mod.rs
  3. 64 0
      tests/persist_tests.rs

+ 27 - 8
src/lib.rs

@@ -627,10 +627,25 @@ impl Raft {
         apply_command_signal: Arc<Condvar>,
     ) {
         // TODO: cancel in flight changes?
-        let args = Self::build_append_entries(&rf, peer_index);
+        let args = match Self::build_append_entries(&rf, peer_index) {
+            Some(args) => args,
+            None => return,
+        };
         let term = args.term;
         let match_index = args.prev_log_index + args.entries.len();
-        let succeeded = Self::append_entries(rpc_client, args).await;
+        let result = tokio::time::timeout(
+            Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS),
+            Self::append_entries(rpc_client, args),
+        )
+        .await;
+
+        let succeeded = match result {
+            Ok(succeeded) => succeeded,
+            Err(_) => {
+                let _ = rerun.send(Some(Peer(peer_index)));
+                return;
+            }
+        };
         match succeeded {
             Ok(Some(succeeded)) => {
                 if succeeded {
@@ -690,18 +705,21 @@ impl Raft {
     fn build_append_entries(
         rf: &Arc<Mutex<RaftState>>,
         peer_index: usize,
-    ) -> AppendEntriesArgs {
+    ) -> Option<AppendEntriesArgs> {
         let rf = rf.lock();
+        if !rf.is_leader() {
+            return None;
+        }
         let prev_log_index = rf.next_index[peer_index] - 1;
         let prev_log_term = rf.log[prev_log_index].term;
-        AppendEntriesArgs {
+        Some(AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,
             prev_log_index,
             prev_log_term,
             entries: rf.log[rf.next_index[peer_index]..].to_vec(),
             leader_commit: rf.commit_index,
-        }
+        })
     }
 
     const APPEND_ENTRIES_RETRY: usize = 3;
@@ -743,12 +761,13 @@ impl Raft {
                         );
                     }
                     if rf.last_applied < rf.commit_index {
-                        rf.last_applied += 1;
-                        let index = rf.last_applied;
-                        let commands: Vec<Command> = rf.log[index..]
+                        let index = rf.last_applied + 1;
+                        let last_one = rf.commit_index + 1;
+                        let commands: Vec<Command> = rf.log[index..last_one]
                             .iter()
                             .map(|entry| entry.command.clone())
                             .collect();
+                        rf.last_applied = rf.commit_index;
                         (index, commands)
                     } else {
                         continue;

+ 8 - 0
tests/config/mod.rs

@@ -328,6 +328,10 @@ impl Config {
             .unwrap()
     }
 
+    pub fn is_connected(&self, index: usize) -> bool {
+        self.state.lock().connected[index]
+    }
+
     pub fn is_server_alive(&self, index: usize) -> bool {
         self.state.lock().rafts[index].is_some()
     }
@@ -340,6 +344,10 @@ impl Config {
         unlock(&self.network).set_reliable(!yes);
     }
 
+    pub fn set_long_reordering(&self, yes: bool) {
+        unlock(&self.network).set_long_reordering(yes);
+    }
+
     pub fn end(&self) {}
 
     pub fn cleanup(&self) {

+ 64 - 0
tests/persist_tests.rs

@@ -242,3 +242,67 @@ fn unreliable_agree() -> config::Result<()> {
     drop(_guard);
     Ok(())
 }
+
+#[test]
+fn figure8_unreliable() -> config::Result<()> {
+    const SERVERS: usize = 5;
+    let cfg = config::make_config(SERVERS, false);
+    let _guard = cfg.deferred_cleanup();
+
+    cfg.begin("Test (2C): Figure 8 (unreliable)");
+
+    cfg.one(thread_rng().gen_range(0, 10000), 1, true)?;
+
+    let mut nup = SERVERS;
+    for iters in 0..1000 {
+        if iters == 200 {
+            cfg.set_long_reordering(true);
+        }
+
+        let mut leader = None;
+        for i in 0..SERVERS {
+            if cfg.is_server_alive(i) {
+                if let Some(_) = cfg.leader_start(i, thread_rng().gen()) {
+                    if cfg.is_connected(i) {
+                        leader = Some(i);
+                    }
+                }
+            }
+        }
+
+        let millis_upper = if thread_rng().gen_ratio(100, 1000) {
+            config::LONG_ELECTION_TIMEOUT_MILLIS >> 1
+        } else {
+            // Magic number 13?
+            13
+        };
+        let millis = thread_rng().gen_range(0, millis_upper);
+        config::sleep_millis(millis);
+
+        if let Some(leader) = leader {
+            if thread_rng().gen_ratio(1, 2) {
+                cfg.disconnect(leader);
+                nup -= 1;
+            }
+        }
+
+        if nup < 3 {
+            let index = thread_rng().gen_range(0, SERVERS);
+            if !cfg.is_connected(index) {
+                cfg.connect(index);
+                nup += 1
+            }
+        }
+    }
+
+    for index in 0..SERVERS {
+        if !cfg.is_connected(index) {
+            cfg.connect(index);
+        }
+    }
+
+    cfg.one(thread_rng().gen_range(0, 10000), SERVERS, true)?;
+
+    drop(_guard);
+    Ok(())
+}