regression_tests.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. use std::sync::Arc;
  2. use std::time::{Duration, Instant};
  3. use ruaft::utils::integration_test::{
  4. unpack_append_entries_args, unpack_append_entries_reply,
  5. };
  6. use test_configs::interceptor::{make_config, RaftRpcEvent};
  7. use test_utils::init_test_log;
  8. #[tokio::test(flavor = "multi_thread")]
  9. async fn smoke_test() {
  10. init_test_log!();
  11. let server_count = 3;
  12. let config = make_config(server_count, None);
  13. let config = Arc::new(config);
  14. let put = tokio::spawn(
  15. config.spawn_put("commit".to_string(), "consistency".to_string()),
  16. );
  17. let mut responded = false;
  18. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  19. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  20. if let Some(index_term) = unpack_append_entries_args(args) {
  21. let (term, success) = unpack_append_entries_reply(reply);
  22. if term == index_term.term && success && index_term.index >= 1 {
  23. responded = true;
  24. break;
  25. }
  26. }
  27. }
  28. handle.unblock();
  29. }
  30. assert!(responded, "At least one peer must have responded OK");
  31. let result = put.await.unwrap();
  32. assert!(result.is_ok());
  33. let get = tokio::spawn(config.spawn_get("commit".to_string()));
  34. let start = Instant::now();
  35. while let Ok((_event, handle)) = config
  36. .event_queue
  37. .receiver
  38. .recv_timeout(Duration::from_secs(1))
  39. {
  40. if get.is_finished() {
  41. break;
  42. }
  43. if start.elapsed() >= Duration::from_secs(1) {
  44. break;
  45. }
  46. handle.unblock();
  47. }
  48. assert!(get.is_finished());
  49. let value = get.await.unwrap().unwrap();
  50. assert_eq!("consistency", value);
  51. }
  52. #[tokio::test(flavor = "multi_thread")]
  53. async fn delayed_commit_consistency_test() {
  54. init_test_log!();
  55. let server_count = 3;
  56. let config = Arc::new(make_config(server_count, None));
  57. let first_write = tokio::spawn(
  58. config.spawn_put("consistency".to_string(), "failed".to_string()),
  59. );
  60. let mut write_handle = None;
  61. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  62. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  63. if let Some(index_term) = unpack_append_entries_args(args) {
  64. let (term, success) = unpack_append_entries_reply(reply);
  65. if term == index_term.term && success && index_term.index >= 1 {
  66. if write_handle.is_none() {
  67. write_handle.replace(handle);
  68. break;
  69. } else {
  70. handle.reply_interrupted_error();
  71. }
  72. }
  73. }
  74. }
  75. }
  76. let write_handle = write_handle.unwrap();
  77. let leader = write_handle.from;
  78. assert!(
  79. config.kv_servers[leader].raft().get_state().1,
  80. "leader should still be leader"
  81. );
  82. // Block everything from/to leader until we see a new leader.
  83. let mut new_leader = leader;
  84. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  85. let from = handle.from;
  86. if from == leader || handle.to == leader {
  87. handle.reply_interrupted_error();
  88. } else {
  89. handle.unblock();
  90. if config.kv_servers[from].raft().get_state().1 {
  91. new_leader = from;
  92. break;
  93. }
  94. }
  95. }
  96. assert_ne!(new_leader, leader, "A new leader must have been elected");
  97. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  98. let second_write = tokio::spawn(config.spawn_put_to_kv(
  99. new_leader,
  100. "consistency".to_string(),
  101. "guaranteed".to_string(),
  102. ));
  103. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  104. if handle.from == leader || handle.to == leader {
  105. handle.reply_interrupted_error();
  106. } else {
  107. handle.unblock();
  108. }
  109. if second_write.is_finished() {
  110. break;
  111. }
  112. }
  113. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  114. assert!(second_write.is_finished());
  115. second_write.await.unwrap().unwrap();
  116. let read = tokio::spawn(
  117. config.spawn_get_from_kv(leader, "consistency".to_string()),
  118. );
  119. // Spare kv server some time to handle the request.
  120. std::thread::sleep(Duration::from_millis(100));
  121. // Unblocks the write response.
  122. write_handle.unblock();
  123. first_write.await.unwrap().unwrap();
  124. assert!(read.await.unwrap().is_err());
  125. }