瀏覽代碼

Move commit index after a successful sync.

Jing Yang 5 年之前
父節點
當前提交
b5f50b019d
共有 1 個文件被更改,包括 32 次插入4 次删除
  1. 32 4
      src/lib.rs

+ 32 - 4
src/lib.rs

@@ -474,10 +474,32 @@ impl Raft {
     ) {
         // TODO: cancel in flight changes?
         let args = Self::build_append_entries(&rf, peer_index);
+        let term = args.term;
+        let match_index = args.prev_log_index + args.entries.len();
         let succeeded = Self::append_entries(rpc_client, args).await;
         match succeeded {
-            Ok(done) => {
-                if !done {
+            Ok(Some(succeeded)) => {
+                if succeeded {
+                    let mut rf = rf.lock();
+                    rf.next_index[peer_index] = match_index + 1;
+                    if match_index > rf.match_index[peer_index] {
+                        rf.match_index[peer_index] = match_index;
+                        if rf.state == State::Leader && rf.current_term == term
+                        {
+                            let mut matched = rf.match_index.to_vec();
+                            let mid = matched.len() / 2 + 1;
+                            matched.sort();
+                            let new_commit_index = matched[mid];
+                            if new_commit_index > rf.commit_index
+                                && rf.log[new_commit_index].term
+                                    == rf.current_term
+                            {
+                                rf.commit_index = new_commit_index;
+                                // TODO: apply command.
+                            }
+                        }
+                    }
+                } else {
                     let mut rf = rf.lock();
 
                     let step = &mut rf.current_step[peer_index];
@@ -494,6 +516,8 @@ impl Raft {
                     rerun.send(Some(Peer(peer_index)));
                 }
             }
+            // Do nothing, not our term anymore.
+            Ok(None) => {}
             Err(_) => {
                 tokio::time::delay_for(Duration::from_millis(
                     HEARTBEAT_INTERVAL_MILLIS,
@@ -524,13 +548,17 @@ impl Raft {
     async fn append_entries(
         rpc_client: RpcClient,
         args: AppendEntriesArgs,
-    ) -> std::io::Result<bool> {
+    ) -> std::io::Result<Option<bool>> {
         let term = args.term;
         let reply = retry_rpc(Self::APPEND_ENTRIES_RETRY, move |_round| {
             rpc_client.clone().call_append_entries(args.clone())
         })
         .await?;
-        Ok(reply.term != term || reply.success)
+        Ok(if reply.term == term {
+            Some(reply.success)
+        } else {
+            None
+        })
     }
 }