regression_tests.rs 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. use ruaft::utils::integration_test::{
  2. unpack_append_entries_args, unpack_append_entries_reply,
  3. };
  4. use std::sync::Arc;
  5. use std::time::{Duration, Instant};
  6. use test_configs::interceptor::{make_config, RaftRpcEvent};
  7. use test_utils::init_test_log;
  8. #[test]
  9. fn smoke_test() {
  10. init_test_log!();
  11. let server_count = 3;
  12. let config = make_config(server_count, None);
  13. let config = Arc::new(config);
  14. let thread_pool = tokio::runtime::Runtime::new().unwrap();
  15. let put = thread_pool.spawn(
  16. config.spawn_put("commit".to_string(), "consistency".to_string()),
  17. );
  18. let mut responded = false;
  19. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  20. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  21. if let Some(index_term) = unpack_append_entries_args(args) {
  22. let (term, success) = unpack_append_entries_reply(reply);
  23. if term == index_term.term && success && index_term.index >= 1 {
  24. responded = true;
  25. break;
  26. }
  27. }
  28. }
  29. handle.unblock();
  30. }
  31. assert!(responded, "At least one peer must have responded OK");
  32. let result = thread_pool.block_on(put).unwrap();
  33. assert!(result.is_ok());
  34. let get = thread_pool.spawn(config.spawn_get("commit".to_string()));
  35. let start = Instant::now();
  36. while let Ok((_event, handle)) = config
  37. .event_queue
  38. .receiver
  39. .recv_timeout(Duration::from_secs(1))
  40. {
  41. if get.is_finished() {
  42. break;
  43. }
  44. if start.elapsed() >= Duration::from_secs(1) {
  45. break;
  46. }
  47. handle.unblock();
  48. }
  49. assert!(get.is_finished());
  50. let value = thread_pool.block_on(get).unwrap().unwrap();
  51. assert_eq!("consistency", value);
  52. }