Kaynağa Gözat

Setup test logger in KVServer and tests.

Jing Yang 4 yıl önce
ebeveyn
işleme
aae03cec67

+ 4 - 1
kvraft/Cargo.toml

@@ -9,11 +9,14 @@ bytes = "1.0"
 labrpc = "0.1.12"
 parking_lot = "0.11.1"
 rand = "0.8"
-ruaft = { path = "../" }
+ruaft = { path = "..", features = ["integration-test"] }
 linearizability = { path = "../linearizability" }
 serde = "1.0.116"
 serde_derive = "1.0.116"
+test_utils = { path = "../test_utils" }
 tokio = { version = "1.7", features = ["time", "parking_lot"] }
+log = "0.4.14"
 
 [dev-dependencies]
 scopeguard = "1.1.0"
+stdext = "0.3.0"

+ 10 - 0
kvraft/src/server.rs

@@ -8,6 +8,7 @@ use std::time::Duration;
 use parking_lot::{Condvar, Mutex};
 
 use ruaft::{ApplyCommandMessage, Persister, Raft, RpcClient, Term};
+use test_utils::thread_local_logger::LocalLogger;
 
 use crate::common::{
     ClerkId, GetArgs, GetEnum, GetReply, KVError, PutAppendArgs, PutAppendEnum,
@@ -20,6 +21,7 @@ pub struct KVServer {
     state: Mutex<KVServerState>,
     rf: Mutex<Raft<UniqueKVOp>>,
     keep_running: AtomicBool,
+    logger: LocalLogger,
 }
 
 #[derive(Clone, Default, Serialize, Deserialize)]
@@ -115,6 +117,7 @@ impl KVServer {
                 move |index| snapshot_holder_clone.request_snapshot(index),
             )),
             keep_running: AtomicBool::new(true),
+            logger: LocalLogger::inherit(),
         });
         ret.process_command(snapshot_holder, rx);
         ret
@@ -199,7 +202,11 @@ impl KVServer {
         command_channel: Receiver<ApplyCommandMessage<UniqueKVOp>>,
     ) {
         let this = Arc::downgrade(self);
+        let logger = LocalLogger::inherit();
+        let me = self.me();
         std::thread::spawn(move || {
+            logger.attach();
+            log::info!("KVServer {} waiting for commands ...", me);
             while let Ok(message) = command_channel.recv() {
                 if let Some(this) = this.upgrade() {
                     match message {
@@ -224,6 +231,7 @@ impl KVServer {
                     break;
                 }
             }
+            log::info!("KVServer {} stopped waiting for commands.", me);
         });
     }
 
@@ -345,6 +353,7 @@ impl KVServer {
     const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
 
     pub fn get(&self, args: GetArgs) -> GetReply {
+        self.logger.clone().attach();
         let map_dup = match args.op {
             GetEnum::AllowDuplicate => |r| Ok(r),
             GetEnum::NoDuplicate => |_| Err(KVError::Conflict),
@@ -372,6 +381,7 @@ impl KVServer {
     }
 
     pub fn put_append(&self, args: PutAppendArgs) -> PutAppendReply {
+        self.logger.clone().attach();
         let op = match args.op {
             PutAppendEnum::Put => KVOp::Put(args.key, args.value),
             PutAppendEnum::Append => KVOp::Append(args.key, args.value),

+ 6 - 1
kvraft/src/testing_utils/generic_test.rs

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
 
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
+use test_utils::thread_local_logger::LocalLogger;
 
 use linearizability::{KvInput, KvModel, KvOp, KvOutput, Operation};
 
@@ -175,7 +176,8 @@ pub fn generic_test(test_params: GenericTestParams) {
 
     let mut laps = vec![];
     const ROUNDS: usize = 3;
-    for _ in 0..ROUNDS {
+    for round in 0..ROUNDS {
+        log::info!("Running round {}", round);
         let start = Instant::now();
         // Network partition thread.
         let partition_stop = Arc::new(AtomicBool::new(false));
@@ -185,8 +187,11 @@ pub fn generic_test(test_params: GenericTestParams) {
         let config = cfg.clone();
         let clients_stop_clone = clients_stop.clone();
         let ops_clone = ops.clone();
+        let logger = LocalLogger::inherit();
         let spawn_client_results = std::thread::spawn(move || {
+            logger.clone().attach();
             spawn_clients(config, clients, move |index: usize, clerk: Clerk| {
+                logger.clone().attach();
                 if !test_linearizability {
                     appending_client(index, clerk, clients_stop_clone.clone())
                 } else {

+ 18 - 1
kvraft/tests/service_test.rs

@@ -11,6 +11,8 @@ use kvraft::testing_utils::config::{
 use kvraft::testing_utils::generic_test::{
     generic_test, spawn_clients, GenericTestParams,
 };
+use test_utils::init_test_log;
+use test_utils::thread_local_logger::LocalLogger;
 
 type Result = std::result::Result<(), String>;
 
@@ -56,6 +58,7 @@ fn check_concurrent_results(
 
 #[test]
 fn basic_service() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 1,
         ..Default::default()
@@ -64,6 +67,7 @@ fn basic_service() {
 
 #[test]
 fn concurrent_client() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         ..Default::default()
@@ -72,6 +76,7 @@ fn concurrent_client() {
 
 #[test]
 fn unreliable_many_clients() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         unreliable: true,
@@ -81,6 +86,7 @@ fn unreliable_many_clients() {
 
 #[test]
 fn unreliable_one_key_many_clients() -> Result {
+    init_test_log!();
     const SERVERS: usize = 5;
     let cfg = Arc::new(make_config(SERVERS, true, 0));
     defer!(cfg.clean_up());
@@ -93,8 +99,10 @@ fn unreliable_one_key_many_clients() -> Result {
 
     const CLIENTS: usize = 5;
     const ATTEMPTS: usize = 10;
+    let logger = LocalLogger::inherit();
     let client_results =
-        spawn_clients(cfg.clone(), CLIENTS, |index, mut clerk| {
+        spawn_clients(cfg.clone(), CLIENTS, move |index, mut clerk| {
+            logger.clone().attach();
             for i in 0..ATTEMPTS {
                 clerk.append("k", format!("({}, {})", index, i));
             }
@@ -110,6 +118,7 @@ fn unreliable_one_key_many_clients() -> Result {
 
 #[test]
 fn one_partition() -> Result {
+    init_test_log!();
     const SERVERS: usize = 5;
     let cfg = Arc::new(make_config(SERVERS, false, 0));
     defer!(cfg.clean_up());
@@ -174,6 +183,7 @@ fn one_partition() -> Result {
 
 #[test]
 fn many_partitions_one_client() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 1,
         partition: true,
@@ -183,6 +193,7 @@ fn many_partitions_one_client() {
 
 #[test]
 fn many_partitions_many_client() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         partition: true,
@@ -192,6 +203,7 @@ fn many_partitions_many_client() {
 
 #[test]
 fn persist_one_client() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 1,
         crash: true,
@@ -201,6 +213,7 @@ fn persist_one_client() {
 
 #[test]
 fn persist_concurrent() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         crash: true,
@@ -210,6 +223,7 @@ fn persist_concurrent() {
 
 #[test]
 fn persist_concurrent_unreliable() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         unreliable: true,
@@ -220,6 +234,7 @@ fn persist_concurrent_unreliable() {
 
 #[test]
 fn persist_partition() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         partition: true,
@@ -230,6 +245,7 @@ fn persist_partition() {
 
 #[test]
 fn persist_partition_unreliable() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 5,
         unreliable: true,
@@ -242,6 +258,7 @@ fn persist_partition_unreliable() {
 
 #[test]
 fn linearizability() {
+    init_test_log!();
     generic_test(GenericTestParams {
         clients: 15,
         unreliable: true,