Explorar el Código

Optimize syn log entries

1. Avoid unnecessary scheduling if there is one in queue already.
2. Spread the shared task count to the size of L1 cache line, to
avoid frequent cache flips.
Jing Yang hace 5 años
padre
commit
f63dfdfc7f
Se han modificado 1 ficheros con 20 adiciones y 13 borrados
  1. 20 13
      src/lib.rs

+ 20 - 13
src/lib.rs

@@ -126,6 +126,9 @@ struct AppendEntriesReply {
     success: bool,
 }
 
+#[repr(align(64))]
+struct Opening(Arc<AtomicUsize>);
+
 impl Raft {
     /// Create a new raft instance.
     ///
@@ -605,8 +608,10 @@ impl Raft {
         let this = self.clone();
         std::thread::spawn(move || {
             let mut openings = vec![];
-            openings.resize_with(this.peers.len(), || AtomicUsize::new(0));
-            let openings = Arc::new(openings); // Not mutable beyond this point.
+            openings.resize_with(this.peers.len(), || {
+                Opening(Arc::new(AtomicUsize::new(0)))
+            });
+            let openings = openings; // Not mutable beyond this point.
 
             while let Ok(peer) = rx.recv() {
                 if !this.keep_running.load(Ordering::SeqCst) {
@@ -618,15 +623,18 @@ impl Raft {
                 for (i, rpc_client) in this.peers.iter().enumerate() {
                     if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
                     {
-                        openings[i].fetch_add(1, Ordering::SeqCst);
-                        this.thread_pool.spawn(Self::sync_log_entry(
-                            this.inner_state.clone(),
-                            rpc_client.clone(),
-                            i,
-                            this.new_log_entry.clone().unwrap(),
-                            openings.clone(),
-                            this.apply_command_signal.clone(),
-                        ));
+                        // Only schedule a new task if the last task has cleared
+                        // the queue of RPC requests.
+                        if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
+                            this.thread_pool.spawn(Self::sync_log_entry(
+                                this.inner_state.clone(),
+                                rpc_client.clone(),
+                                i,
+                                this.new_log_entry.clone().unwrap(),
+                                openings[i].0.clone(),
+                                this.apply_command_signal.clone(),
+                            ));
+                        }
                     }
                 }
             }
@@ -643,10 +651,9 @@ impl Raft {
         rpc_client: Arc<RpcClient>,
         peer_index: usize,
         rerun: std::sync::mpsc::Sender<Option<Peer>>,
-        openings: Arc<Vec<AtomicUsize>>,
+        opening: Arc<AtomicUsize>,
         apply_command_signal: Arc<Condvar>,
     ) {
-        let opening = &openings[peer_index];
         if opening.swap(0, Ordering::SeqCst) == 0 {
             return;
         }