Forráskód Böngészése

Replace SeqCst ordering with more relaxed ones in keep_running.

Jing Yang 3 éve
szülő
commit
ca59d5b7ac
7 módosított fájl, 10 hozzáadás és 10 törlés
  1. 1 1
      src/apply_command.rs
  2. 2 2
      src/election.rs
  3. 1 1
      src/heartbeats.rs
  4. 1 1
      src/raft.rs
  5. 2 2
      src/snapshot.rs
  6. 1 1
      src/sync_log_entries.rs
  7. 2 2
      src/verify_authority.rs

+ 1 - 1
src/apply_command.rs

@@ -58,7 +58,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
         let apply_command_daemon = move || {
             log::info!("{:?} apply command daemon running ...", me);
 
-            while keep_running.load(Ordering::SeqCst) {
+            while keep_running.load(Ordering::Relaxed) {
                 let messages = {
                     let mut rf = rf.lock();
                     if rf.last_applied >= rf.commit_index {

+ 2 - 2
src/election.rs

@@ -127,7 +127,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
             let election = this.election.clone();
 
             let mut should_run = None;
-            while this.keep_running.load(Ordering::SeqCst) {
+            while this.keep_running.load(Ordering::Relaxed) {
                 let mut cancel_handle =
                     should_run.and_then(|last_timer_count| {
                         this.run_election(last_timer_count)
@@ -158,7 +158,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 // check the running signal before sleeping. We are holding the
                 // timer lock, so no one can change it. The kill() method will
                 // not be able to notify this thread before `wait` is called.
-                if !this.keep_running.load(Ordering::SeqCst) {
+                if !this.keep_running.load(Ordering::Relaxed) {
                     break;
                 }
                 should_run = match deadline {

+ 1 - 1
src/heartbeats.rs

@@ -87,7 +87,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
                 let keep_running = self.keep_running.clone();
                 self.thread_pool.spawn(async move {
                     let mut interval = tokio::time::interval(interval);
-                    while keep_running.load(Ordering::SeqCst) {
+                    while keep_running.load(Ordering::Relaxed) {
                         let tick = interval.tick();
                         let trigger = trigger.recv();
                         futures_util::pin_mut!(tick, trigger);

+ 1 - 1
src/raft.rs

@@ -171,7 +171,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
     /// Cleanly shutdown this instance. This function never blocks forever. It
     /// either panics or returns eventually.
     pub fn kill(mut self) {
-        self.keep_running.store(false, Ordering::SeqCst);
+        self.keep_running.store(false, Ordering::Release);
         self.election.stop_election_timer();
         self.new_log_entry.take().map(|n| n.send(None));
         self.apply_command_signal.notify_all();

+ 2 - 2
src/snapshot.rs

@@ -146,7 +146,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
         log::info!("{:?} snapshot daemon running ...", me);
         let snapshot_daemon = move || loop {
             parker.park();
-            if !keep_running.load(Ordering::SeqCst) {
+            if !keep_running.load(Ordering::Acquire) {
                 log::info!("{:?} snapshot daemon done.", me);
 
                 // Explicitly drop every thing.
@@ -161,7 +161,7 @@ impl<C: 'static + Clone + Send + serde::Serialize> Raft<C> {
                 let snapshot = {
                     let mut snapshot =
                         snapshot_daemon.current_snapshot.0.lock();
-                    if keep_running.load(Ordering::SeqCst)
+                    if keep_running.load(Ordering::Acquire)
                         && snapshot.last_included_index <= log_start.index
                     {
                         request_snapshot(log_start.index + 1);

+ 1 - 1
src/sync_log_entries.rs

@@ -72,7 +72,7 @@ impl<Command: ReplicableCommand> Raft<Command> {
 
             let mut task_number = 0;
             while let Ok(peer) = rx.recv() {
-                if !this.keep_running.load(Ordering::SeqCst) {
+                if !this.keep_running.load(Ordering::Relaxed) {
                     break;
                 }
                 if !this.inner_state.lock().is_leader() {

+ 2 - 2
src/verify_authority.rs

@@ -403,7 +403,7 @@ impl<Command: 'static + Send> Raft<Command> {
 
         let verify_authority_daemon = move || {
             log::info!("{:?} verify authority daemon running ...", me);
-            while keep_running.load(Ordering::Acquire) {
+            while keep_running.load(Ordering::Relaxed) {
                 this_daemon.wait_for(Self::BEAT_RECORDING_MAX_PAUSE);
                 let (current_term, commit_index) = {
                     let rf = rf.lock();
@@ -436,7 +436,7 @@ impl<Command: 'static + Send> Raft<Command> {
         &self,
     ) -> Option<impl Future<Output = crate::VerifyAuthorityResult>> {
         // Fail the request if we have been killed.
-        if !self.keep_running.load(Ordering::Acquire) {
+        if !self.keep_running.load(Ordering::Relaxed) {
             return None;
         }