Parcourir la source

Add a shutdown signal.

Jing Yang il y a 5 ans
Parent
commit
e98ab1a56e
1 fichiers modifiés avec 11 ajouts et 3 suppressions
  1. 11 3
      src/lib.rs

+ 11 - 3
src/lib.rs

@@ -9,7 +9,7 @@ extern crate serde_derive;
 extern crate tokio;
 
 use std::future::Future;
-use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -98,7 +98,7 @@ struct Raft {
     // new_log_entry: Sender<usize>,
     // new_log_entry: Receiver<usize>,
     // apply_command_cond: Condvar
-    keep_running: AtomicBool,
+    keep_running: Arc<AtomicBool>,
     // applyCh: Sender<ApplyMsg>
 }
 
@@ -386,9 +386,13 @@ impl Raft {
                 let rf = self.inner_state.clone();
                 // RPC client must be cloned into the outer async function.
                 let rpc_client = rpc_client.clone();
+                // Shutdown signal.
+                let keep_running = self.keep_running.clone();
                 tokio::spawn(async move {
                     loop {
-                        // TODO: shutdown signal or cancel token.
+                        if !keep_running.load(Ordering::SeqCst) {
+                            break;
+                        }
                         interval.tick().await;
                         if let Some(args) = Self::build_heartbeat(&rf) {
                             tokio::spawn(Self::send_heartbeat(
@@ -448,6 +452,7 @@ impl Raft {
         let peers = self.peers.clone();
         let rf = self.inner_state.clone();
         let me = self.me;
+        let keep_running = self.keep_running.clone();
         let handle = std::thread::spawn(move || {
             while let Ok(peer) = rx.recv() {
                 for (i, rpc_client) in peers.iter().enumerate() {
@@ -460,6 +465,9 @@ impl Raft {
                         ));
                     }
                 }
+                if !keep_running.load(Ordering::SeqCst) {
+                    break;
+                }
             }
         });