regression_tests.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. use std::sync::Arc;
  2. use std::time::{Duration, Instant};
  3. use ruaft::utils::integration_test::{
  4. unpack_append_entries_args, unpack_append_entries_reply,
  5. };
  6. use test_configs::interceptor::{make_config, RaftRpcEvent};
  7. use test_utils::init_test_log;
  8. #[test]
  9. fn smoke_test() {
  10. init_test_log!();
  11. let server_count = 3;
  12. let config = make_config(server_count, None);
  13. let config = Arc::new(config);
  14. let thread_pool = tokio::runtime::Runtime::new().unwrap();
  15. let put = thread_pool.spawn(
  16. config.spawn_put("commit".to_string(), "consistency".to_string()),
  17. );
  18. let mut responded = false;
  19. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  20. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  21. if let Some(index_term) = unpack_append_entries_args(args) {
  22. let (term, success) = unpack_append_entries_reply(reply);
  23. if term == index_term.term && success && index_term.index >= 1 {
  24. responded = true;
  25. break;
  26. }
  27. }
  28. }
  29. handle.unblock();
  30. }
  31. assert!(responded, "At least one peer must have responded OK");
  32. let result = thread_pool.block_on(put).unwrap();
  33. assert!(result.is_ok());
  34. let get = thread_pool.spawn(config.spawn_get("commit".to_string()));
  35. let start = Instant::now();
  36. while let Ok((_event, handle)) = config
  37. .event_queue
  38. .receiver
  39. .recv_timeout(Duration::from_secs(1))
  40. {
  41. if get.is_finished() {
  42. break;
  43. }
  44. if start.elapsed() >= Duration::from_secs(1) {
  45. break;
  46. }
  47. handle.unblock();
  48. }
  49. assert!(get.is_finished());
  50. let value = thread_pool.block_on(get).unwrap().unwrap();
  51. assert_eq!("consistency", value);
  52. }
  53. #[test]
  54. fn delayed_commit_consistency_test() {
  55. init_test_log!();
  56. let server_count = 3;
  57. let config = Arc::new(make_config(server_count, None));
  58. let thread_pool = tokio::runtime::Runtime::new().unwrap();
  59. let first_write = thread_pool.spawn(
  60. config.spawn_put("consistency".to_string(), "failed".to_string()),
  61. );
  62. let mut write_handle = None;
  63. while let Ok((event, handle)) = config.event_queue.receiver.recv() {
  64. if let RaftRpcEvent::AppendEntriesResponse(args, reply) = event {
  65. if let Some(index_term) = unpack_append_entries_args(args) {
  66. let (term, success) = unpack_append_entries_reply(reply);
  67. if term == index_term.term && success && index_term.index >= 1 {
  68. if write_handle.is_none() {
  69. write_handle.replace(handle);
  70. break;
  71. } else {
  72. handle.reply_interrupted_error();
  73. }
  74. }
  75. }
  76. }
  77. }
  78. let write_handle = write_handle.unwrap();
  79. let leader = write_handle.from;
  80. assert!(
  81. config.kv_servers[leader].raft().get_state().1,
  82. "leader should still be leader"
  83. );
  84. // Block everything from/to leader until we see a new leader.
  85. let mut new_leader = leader;
  86. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  87. let from = handle.from;
  88. if from == leader || handle.to == leader {
  89. handle.reply_interrupted_error();
  90. } else {
  91. handle.unblock();
  92. if config.kv_servers[from].raft().get_state().1 {
  93. new_leader = from;
  94. break;
  95. }
  96. }
  97. }
  98. assert_ne!(new_leader, leader, "A new leader must have been elected");
  99. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  100. let second_write = thread_pool.spawn(config.spawn_put_to_kv(
  101. new_leader,
  102. "consistency".to_string(),
  103. "guaranteed".to_string(),
  104. ));
  105. while let Ok((_event, handle)) = config.event_queue.receiver.recv() {
  106. if handle.from == leader || handle.to == leader {
  107. handle.reply_interrupted_error();
  108. } else {
  109. handle.unblock();
  110. }
  111. if second_write.is_finished() {
  112. break;
  113. }
  114. }
  115. assert_eq!(1, config.kv_servers[leader].raft().get_state().0 .0);
  116. assert!(second_write.is_finished());
  117. thread_pool.block_on(second_write).unwrap().unwrap();
  118. let read = thread_pool
  119. .spawn(config.spawn_get_from_kv(leader, "consistency".to_string()));
  120. // Spare kv server some time to handle the request.
  121. std::thread::sleep(Duration::from_millis(100));
  122. // Unblocks the write response.
  123. write_handle.unblock();
  124. thread_pool.block_on(first_write).unwrap().unwrap();
  125. if let Ok(result) = thread_pool.block_on(read).unwrap() {
  126. // This is so wrong. The second write was successful.
  127. assert_eq!(result, "failed");
  128. } else {
  129. panic!("The read request should not timeout");
  130. }
  131. }