regression_tests.rs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. use std::sync::Arc;
  2. use std::time::{Duration, Instant};
  3. use ruaft::utils::integration_test::{
  4. unpack_append_entries_args, unpack_append_entries_reply,
  5. };
  6. use test_configs::interceptor::{make_config, RaftRpcEvent};
  7. #[tokio::test(flavor = "multi_thread")]
  8. async fn smoke_test() {
  9. test_utils::init_log("regression_tests-smoke_test")
  10. .expect("Initializing test log should never fail");
  11. let server_count = 3;
  12. let config = make_config(server_count, None);
  13. let config = Arc::new(config);
  14. let put = tokio::spawn(
  15. config.spawn_put("commit".to_string(), "consistency".to_string()),
  16. );
  17. let mut responded = false;
  18. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  19. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  20. if let Some(index_term) = unpack_append_entries_args(args) {
  21. let (term, success) = unpack_append_entries_reply(reply);
  22. if term == index_term.term && success && index_term.index >= 1 {
  23. responded = true;
  24. break;
  25. }
  26. }
  27. }
  28. handle.unblock();
  29. }
  30. assert!(responded, "At least one peer must have responded OK");
  31. let result = put.await.unwrap();
  32. assert!(result.is_ok());
  33. let get = tokio::spawn(config.spawn_get("commit".to_string()));
  34. let start = Instant::now();
  35. while let Ok((_event, handle)) = config
  36. .event_queue
  37. .receiver
  38. .recv_timeout(Duration::from_secs(1))
  39. {
  40. if get.is_finished() {
  41. break;
  42. }
  43. if start.elapsed() >= Duration::from_secs(1) {
  44. break;
  45. }
  46. handle.unblock();
  47. }
  48. assert!(get.is_finished());
  49. let value = get.await.unwrap().unwrap();
  50. assert_eq!("consistency", value);
  51. }
  52. #[tokio::test(flavor = "multi_thread")]
  53. async fn delayed_commit_consistency_test() {
  54. test_utils::init_log("regression_tests-delayed_commit_consistency_test")
  55. .expect("Initializing test log should never fail");
  56. let server_count = 3;
  57. let config = Arc::new(make_config(server_count, None));
  58. let first_write = tokio::spawn(
  59. config.spawn_put("consistency".to_string(), "failed".to_string()),
  60. );
  61. let mut write_handle = None;
  62. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  63. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  64. if let Some(index_term) = unpack_append_entries_args(args) {
  65. let (term, success) = unpack_append_entries_reply(reply);
  66. if term == index_term.term && success && index_term.index >= 1 {
  67. if write_handle.is_none() {
  68. write_handle.replace(handle);
  69. break;
  70. } else {
  71. handle.reply_interrupted_error();
  72. }
  73. }
  74. }
  75. }
  76. }
  77. let write_handle = write_handle.unwrap();
  78. let leader = write_handle.from;
  79. assert!(
  80. config.kv_servers[leader].raft().get_state().1,
  81. "leader should still be leader"
  82. );
  83. // Block everything from/to leader until we see a new leader.
  84. let mut new_leader = leader;
  85. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  86. let from = handle.from;
  87. if from == leader || handle.to == leader {
  88. handle.reply_interrupted_error();
  89. } else {
  90. handle.unblock();
  91. if config.kv_servers[from].raft().get_state().1 {
  92. new_leader = from;
  93. break;
  94. }
  95. }
  96. }
  97. assert_ne!(new_leader, leader, "A new leader must have been elected");
  98. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  99. let second_write = tokio::spawn(config.spawn_put_to_kv(
  100. new_leader,
  101. "consistency".to_string(),
  102. "guaranteed".to_string(),
  103. ));
  104. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  105. if handle.from == leader || handle.to == leader {
  106. handle.reply_interrupted_error();
  107. } else {
  108. handle.unblock();
  109. }
  110. if second_write.is_finished() {
  111. break;
  112. }
  113. }
  114. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  115. assert!(second_write.is_finished());
  116. second_write.await.unwrap().unwrap();
  117. let read = tokio::spawn(
  118. config.spawn_get_from_kv(leader, "consistency".to_string()),
  119. );
  120. // Spare kv server some time to handle the request.
  121. std::thread::sleep(Duration::from_millis(100));
  122. // Unblocks the write response.
  123. write_handle.unblock();
  124. first_write.await.unwrap().unwrap();
  125. assert!(read.await.unwrap().is_err());
  126. }