Kaynağa Gözat

Fix threading model and let background tasks take a weak Arc.

So that we can now safely kill our background tasks.
Jing Yang 4 yıl önce
ebeveyn
işleme
044cf08151
2 değiştirilmiş dosya ile 12 ekleme ve 4 silme
  1. 8 3
      kvraft/src/server.rs
  2. 4 1
      kvraft/tests/service_test.rs

+ 8 - 3
kvraft/src/server.rs

@@ -111,7 +111,7 @@ impl KVServer {
                 Raft::<UniqueKVOp>::NO_SNAPSHOT,
             )),
         });
-        ret.clone().process_command(rx);
+        ret.process_command(rx);
         ret
     }
 
@@ -178,12 +178,17 @@ impl KVServer {
     }
 
     fn process_command(
-        self: Arc<Self>,
+        self: &Arc<Self>,
         command_channel: Receiver<IndexedCommand>,
     ) {
+        let this = Arc::downgrade(self);
         std::thread::spawn(move || {
             while let Ok((_, command)) = command_channel.recv() {
-                self.apply_op(command.unique_id, command.me, command.op);
+                if let Some(this) = this.upgrade() {
+                    this.apply_op(command.unique_id, command.me, command.op);
+                } else {
+                    break;
+                }
             }
         });
     }

+ 4 - 1
kvraft/tests/service_test.rs

@@ -104,7 +104,7 @@ fn generic_test(test_params: GenericTestParams) {
     let maxraftstate = maxraftstate.unwrap_or(usize::MAX);
     const SERVERS: usize = 5;
     let cfg = Arc::new(make_config(SERVERS, unreliable, maxraftstate));
-    // TODO(ditsing): add `defer!(cfg.clean_up());`
+    defer!(cfg.clean_up());
 
     cfg.begin("");
     let mut clerk = cfg.make_clerk();
@@ -243,6 +243,8 @@ fn unreliable_many_clients() {
 fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
     const SERVERS: usize = 5;
     let cfg = Arc::new(make_config(SERVERS, true, 0));
+    defer!(cfg.clean_up());
+
     let mut clerk = cfg.make_clerk();
 
     cfg.begin("Test: concurrent append to same key, unreliable (3A)");
@@ -269,6 +271,7 @@ fn unreliable_one_key_many_clients() -> anyhow::Result<()> {
 fn one_partition() -> anyhow::Result<()> {
     const SERVERS: usize = 5;
     let cfg = Arc::new(make_config(SERVERS, false, 0));
+    defer!(cfg.clean_up());
 
     cfg.begin("Test: progress in majority (3A)");