Procházet zdrojové kódy

Use term in a smart way to avoid duplicate commits.

The assumptions are
1. If the term has not yet changed since last attempt, no need to retry.
2. If another attempt is being made, no need to retry.
3. Retry in all other cases.

Note that even if the term has changed, and no other attempt is being
made, it is still possible that our previous attempt are committed.
We'll retry anyway, and get notified when the first attempt or the
current attempt is successful.
Jing Yang před 4 roky
rodič
revize
1207b5d42f
1 změnil soubory, kde provedl 62 přidání a 7 odebrání
  1. 62 7
      kvraft/src/server.rs

+ 62 - 7
kvraft/src/server.rs

@@ -3,7 +3,7 @@ use super::common::{
     PutAppendReply, UniqueId,
 };
 use parking_lot::{Condvar, Mutex};
-use ruaft::{Persister, Raft, RpcClient};
+use ruaft::{Persister, Raft, RpcClient, Term};
 use std::collections::hash_map::Entry;
 use std::collections::HashMap;
 use std::sync::atomic::{AtomicUsize, Ordering};
@@ -50,6 +50,7 @@ impl Default for KVOp {
 }
 
 struct ResultHolder {
+    term: AtomicUsize,
     result: Mutex<Result<CommitResult, CommitError>>,
     condvar: Condvar,
 }
@@ -182,6 +183,8 @@ impl KVServer {
         });
     }
 
+    const UNSEEN_TERM: usize = 0;
+    const ATTEMPTING_TERM: usize = usize::MAX;
     fn block_for_commit(
         &self,
         unique_id: UniqueId,
@@ -200,6 +203,7 @@ impl KVServer {
             };
             let entry = state.queries.entry(unique_id).or_insert_with(|| {
                 Arc::new(ResultHolder {
+                    term: AtomicUsize::new(Self::UNSEEN_TERM),
                     result: Mutex::new(Err(CommitError::TimedOut)),
                     condvar: Condvar::new(),
                 })
@@ -207,14 +211,57 @@ impl KVServer {
             entry.clone()
         };
 
-        let op = UniqueKVOp {
-            op,
-            me: self.me(),
-            unique_id,
-        };
-        if self.rf.lock().start(op).is_none() {
+        let (Term(hold_term), is_leader) = self.rf.lock().get_state();
+        if !is_leader {
+            result_holder.condvar.notify_all();
             return Err(CommitError::NotLeader);
         }
+        Self::validate_term(hold_term);
+
+        let set = result_holder.term.compare_exchange(
+            Self::UNSEEN_TERM,
+            Self::ATTEMPTING_TERM,
+            Ordering::SeqCst,
+            Ordering::SeqCst,
+        );
+        let start = match set {
+            // Nobody has attempted start() yet.
+            Ok(Self::UNSEEN_TERM) => true,
+            Ok(_) => panic!(
+                "compare_exchange should always return the current value 0"
+            ),
+            // Somebody has attempted, or is attempting, start().
+            Err(prev_term) => {
+                prev_term != Self::ATTEMPTING_TERM && prev_term < hold_term
+            }
+        };
+        if start {
+            let op = UniqueKVOp {
+                op,
+                me: self.me(),
+                unique_id,
+            };
+            let start = self.rf.lock().start(op);
+            let start_term =
+                start.map_or(Self::UNSEEN_TERM, |(Term(term), _)| {
+                    Self::validate_term(term);
+                    term
+                });
+            let set = result_holder.term.compare_exchange(
+                Self::ATTEMPTING_TERM,
+                start_term,
+                Ordering::SeqCst,
+                Ordering::SeqCst,
+            );
+            // Setting term must have been successful, and must return the
+            // value previously set by this attempt.
+            assert_eq!(set, Ok(Self::ATTEMPTING_TERM));
+
+            if start_term == Self::UNSEEN_TERM {
+                result_holder.condvar.notify_all();
+                return Err(CommitError::NotLeader);
+            }
+        }
 
         let mut guard = result_holder.result.lock();
         // Wait for the op to be committed.
@@ -230,6 +277,14 @@ impl KVServer {
         return result;
     }
 
+    fn validate_term(term: usize) {
+        assert!(term > Self::UNSEEN_TERM, "Term must be larger than 0.");
+        assert!(
+            term < Self::ATTEMPTING_TERM,
+            "Term must be smaller than usize::MAX."
+        );
+    }
+
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
 
     pub fn get(&self, args: GetArgs) -> GetReply {