Przeglądaj źródła

Add the basic agreement tests, fixed bugs.

Jing Yang 5 lat temu
rodzic
commit
62a468ee3b
4 zmienionych plików z 184 dodań i 6 usunięć
  1. 5 4
      src/lib.rs
  2. 1 1
      src/rpcs.rs
  3. 28 0
      tests/agreement_tests.rs
  4. 150 1
      tests/config/mod.rs

+ 5 - 4
src/lib.rs

@@ -38,7 +38,7 @@ struct Peer(usize);
 pub type Index = usize;
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct Command(usize);
+pub struct Command(pub i32);
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
 struct LogEntry {
@@ -139,7 +139,7 @@ impl Raft {
             }],
             commit_index: 0,
             last_applied: 0,
-            next_index: vec![0; peer_size],
+            next_index: vec![1; peer_size],
             match_index: vec![0; peer_size],
             current_step: vec![0; peer_size],
             state: State::Follower,
@@ -262,7 +262,7 @@ impl Raft {
         self.election.reset_election_timer();
 
         if rf.log.len() <= args.prev_log_index
-            || rf.log[args.prev_log_index].term != args.term
+            || rf.log[args.prev_log_index].term != args.prev_log_term
         {
             return AppendEntriesReply {
                 term: args.term,
@@ -677,7 +677,8 @@ impl Raft {
         peer_index: usize,
     ) -> AppendEntriesArgs {
         let rf = rf.lock();
-        let (prev_log_index, prev_log_term) = rf.last_log_index_and_term();
+        let prev_log_index = rf.next_index[peer_index] - 1;
+        let prev_log_term = rf.log[prev_log_index].term;
         AppendEntriesArgs {
             term: rf.current_term,
             leader_id: rf.leader_id,

+ 1 - 1
src/rpcs.rs

@@ -160,7 +160,7 @@ mod tests {
             rpc_client.clone().call_append_entries(request),
         )?;
         assert_eq!(2021, response.term.0);
-        assert_eq!(false, response.success);
+        assert_eq!(true, response.success);
 
         Ok(())
     }

+ 28 - 0
tests/agreement_tests.rs

@@ -0,0 +1,28 @@
+extern crate labrpc;
+extern crate ruaft;
+#[macro_use]
+extern crate anyhow;
+
+mod config;
+
+#[test]
+fn basic_agreement() -> config::Result<()> {
+    const SERVERS: usize = 5;
+    let cfg = config::make_config(SERVERS, false);
+    let _guard = cfg.deferred_cleanup();
+
+    cfg.begin("Test (2B): basic agreement");
+    for index in 1..4 {
+        let committed = cfg.committed_count(index)?;
+        assert_eq!(0, committed.0, "some have committed before start()");
+
+        let commit_index = cfg.one(index as i32 * 100, SERVERS, false)?;
+        assert_eq!(
+            index, commit_index,
+            "got index {} but expected {}",
+            commit_index, index
+        );
+    }
+
+    Ok(())
+}

+ 150 - 1
tests/config/mod.rs

@@ -10,13 +10,23 @@ struct ConfigState {
     connected: Vec<bool>,
 }
 
+struct LogState {
+    committed_logs: Vec<Vec<i32>>,
+    results: Vec<Result<()>>,
+    max_index: usize,
+}
+
 pub struct Config {
     network: Arc<std::sync::Mutex<labrpc::Network>>,
     server_count: usize,
     state: Mutex<ConfigState>,
+    log: Arc<Mutex<LogState>>,
 }
 
 pub use anyhow::Result;
+use ruaft::utils::DropGuard;
+use std::time::Instant;
+use tokio::time::Duration;
 
 impl Config {
     fn server_name(i: usize) -> String {
@@ -105,6 +115,80 @@ impl Config {
         Ok(())
     }
 
+    /// Returns the number of peers that committed at least `index` commands,
+    /// as well as the command at the index.
+    pub fn committed_count(&self, index: usize) -> Result<(usize, i32)> {
+        let mut count = 0;
+        let mut cmd = Self::INVALID_COMMAND;
+        for i in 0..self.server_count {
+            let log = self.log.lock();
+            if let Err(e) = &log.results[i] {
+                bail!(e.to_string())
+            }
+            if log.committed_logs[i].len() > index {
+                let command = log.committed_logs[i][index];
+                if count > 0 {
+                    assert_eq!(
+                        command, cmd,
+                        "committed values do not match: index {}, {}, {}",
+                        index, cmd, command
+                    )
+                }
+                count += 1;
+                cmd = command;
+            }
+        }
+        Ok((count, cmd))
+    }
+
+    pub fn one(
+        &self,
+        cmd: i32,
+        expected_servers: usize,
+        retry: bool,
+    ) -> Result<usize> {
+        let start = Instant::now();
+        let mut cnt = 0;
+        while start.elapsed() < Duration::from_secs(10) {
+            let mut first_index = None;
+            for _ in 0..self.server_count {
+                cnt += 1;
+                cnt %= self.server_count;
+                let state = self.state.lock();
+                if state.connected[cnt] {
+                    if let Some(raft) = &state.rafts[cnt] {
+                        if let Some((_, index)) =
+                            raft.start(ruaft::Command(cmd))
+                        {
+                            first_index.replace(index);
+                        }
+                    }
+                }
+            }
+
+            if let Some(index) = first_index {
+                let agreement_start = Instant::now();
+                while agreement_start.elapsed() < Duration::from_secs(2) {
+                    let (commit_count, committed_command) =
+                        self.committed_count(index)?;
+                    if commit_count > 0
+                        && commit_count >= expected_servers
+                        && committed_command == cmd
+                    {
+                        return Ok(index);
+                    }
+                    sleep_millis(20);
+                }
+                if !retry {
+                    break;
+                }
+            } else {
+                sleep_millis(50);
+            }
+        }
+        Err(anyhow!("one({}) failed to reach agreement", cmd))
+    }
+
     pub fn connect(&self, index: usize) {
         self.set_connect(index, true);
     }
@@ -158,7 +242,10 @@ impl Config {
             }
         }
 
-        let raft = Raft::new(clients, index, |_, _| {});
+        let log_clone = self.log.clone();
+        let raft = Raft::new(clients, index, move |cmd_index, cmd| {
+            Self::apply_command(log_clone.clone(), index, cmd_index, cmd.0)
+        });
         self.state.lock().rafts[index].replace(raft.clone());
 
         let raft = Arc::new(raft);
@@ -178,6 +265,61 @@ impl Config {
             }
         }
     }
+
+    pub fn deferred_cleanup(&self) -> impl Drop + '_ {
+        DropGuard::new(move || self.cleanup())
+    }
+}
+
+impl Config {
+    const INVALID_COMMAND: i32 = -1;
+
+    fn apply_command(
+        log_state: Arc<Mutex<LogState>>,
+        server_index: usize,
+        index: usize,
+        command: i32,
+    ) {
+        let mut log_state = log_state.lock();
+        let committed_logs = &mut log_state.committed_logs;
+        let mut err = None;
+        for (one_index, one_server) in committed_logs.iter().enumerate() {
+            if one_server.len() > index && one_server[index] != command {
+                err = Some((
+                    one_index,
+                    Err(anyhow!(
+                        "commit index ={} server={} {} != server={} {}",
+                        index,
+                        server_index,
+                        command,
+                        one_index,
+                        one_server[index],
+                    )),
+                ));
+                break;
+            }
+        }
+
+        let one_server = &mut committed_logs[server_index];
+        if one_server.len() <= index {
+            one_server.resize(index + 1, Self::INVALID_COMMAND);
+        }
+        one_server[index] = command;
+
+        if index > 1 && one_server[index - 1] == Self::INVALID_COMMAND {
+            log_state.results[server_index] = Err(anyhow!(
+                "server {} apply out of order {}",
+                server_index,
+                index
+            ));
+        } else if let Some((one_index, err)) = err {
+            log_state.results[one_index] = err
+        }
+
+        if index > log_state.max_index {
+            log_state.max_index = index;
+        }
+    }
 }
 
 fn unlock<T>(locked: &std::sync::Mutex<T>) -> std::sync::MutexGuard<T> {
@@ -196,10 +338,17 @@ pub fn make_config(server_count: usize, unreliable: bool) -> Config {
         rafts: vec![None; server_count],
         connected: vec![true; server_count],
     });
+    let log = Arc::new(Mutex::new(LogState {
+        committed_logs: vec![vec![]; server_count],
+        results: vec![],
+        max_index: 0,
+    }));
+    log.lock().results.resize_with(server_count, || Ok(()));
     let mut cfg = Config {
         network,
         server_count,
         state,
+        log,
     };
 
     for i in 0..server_count {