Browse Source

Add a stop barrier.

Jing Yang 5 năm trước cách đây
mục cha
commit
8aee179c10
1 tập tin đã thay đổi với 18 bổ sung2 xóa
  1. 18 2
      src/lib.rs

+ 18 - 2
src/lib.rs

@@ -84,6 +84,8 @@ pub struct Raft {
     election: Arc<ElectionState>,
 
     thread_pool: Arc<tokio::runtime::Runtime>,
+
+    stop_barrier: Arc<std::sync::Barrier>,
 }
 
 #[derive(Clone, Serialize, Deserialize)]
@@ -157,6 +159,7 @@ impl Raft {
             .max_threads(peer_size * 2)
             .build()
             .expect("Creating thread pool should not fail");
+        const RUNNING_THREADS: usize = 4;
         let mut this = Raft {
             inner_state: Arc::new(Mutex::new(state)),
             peers,
@@ -166,6 +169,7 @@ impl Raft {
             keep_running: Arc::new(Default::default()),
             election: Arc::new(election),
             thread_pool: Arc::new(thread_pool),
+            stop_barrier: Arc::new(std::sync::Barrier::new(RUNNING_THREADS)),
         };
 
         // TODO: read persist.
@@ -292,7 +296,7 @@ impl Raft {
         }
     }
 
-    fn run_election_timer(&self) {
+    fn run_election_timer(&self) -> std::thread::JoinHandle<()> {
         let this = self.clone();
         std::thread::spawn(move || {
             let election = this.election.clone();
@@ -355,7 +359,11 @@ impl Raft {
                 // be notified by the election signal.
                 cancel_handle.map(|c| c.send(()));
             }
-        });
+
+            let stop_barrier = this.stop_barrier.clone();
+            drop(this);
+            stop_barrier.wait();
+        })
     }
 
     fn run_election(
@@ -585,6 +593,10 @@ impl Raft {
                     }
                 }
             }
+
+            let stop_barrier = this.stop_barrier.clone();
+            drop(this);
+            stop_barrier.wait();
         })
     }
 
@@ -698,6 +710,7 @@ impl Raft {
         let keep_running = self.keep_running.clone();
         let rf = self.inner_state.clone();
         let condvar = self.apply_command_signal.clone();
+        let stop_barrier = self.stop_barrier.clone();
         std::thread::spawn(move || {
             while keep_running.load(Ordering::SeqCst) {
                 let (mut index, commands) = {
@@ -727,6 +740,8 @@ impl Raft {
                     index += 1;
                 }
             }
+
+            stop_barrier.wait();
         })
     }
 
@@ -759,6 +774,7 @@ impl Raft {
         self.election.stop_election_timer();
         self.new_log_entry.take().map(|n| n.send(None));
         self.apply_command_signal.notify_all();
+        self.stop_barrier.wait();
         std::sync::Arc::try_unwrap(self.thread_pool)
             .expect(
                 "All references to the thread pool should have been dropped.",