sync_log_entries.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use std::time::Duration;
  4. use parking_lot::{Condvar, Mutex};
  5. use crate::check_or_record;
  6. use crate::daemon_env::{Daemon, ErrorKind};
  7. use crate::index_term::IndexTerm;
  8. use crate::term_marker::TermMarker;
  9. use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
  10. use crate::{
  11. AppendEntriesArgs, InstallSnapshotArgs, Peer, Raft, RaftState, RemoteRaft,
  12. Term, HEARTBEAT_INTERVAL_MILLIS,
  13. };
  14. #[repr(align(64))]
  15. struct Opening(Arc<AtomicUsize>);
  16. enum SyncLogEntriesOperation<Command> {
  17. AppendEntries(AppendEntriesArgs<Command>),
  18. InstallSnapshot(InstallSnapshotArgs),
  19. None,
  20. }
  21. enum SyncLogEntriesResult {
  22. TermElapsed(Term),
  23. Archived(IndexTerm),
  24. Diverged(IndexTerm),
  25. Success,
  26. }
  27. // Command must be
  28. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  29. // 1. clone: they are copied to the persister.
  30. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  31. // 3. serialize: they are converted to bytes to persist.
  32. // 4. default: a default value is used as the first element of log.
  33. impl<Command> Raft<Command>
  34. where
  35. Command: 'static + Clone + Send + serde::Serialize + Default,
  36. {
  37. /// Runs a daemon thread that syncs log entries to peers.
  38. ///
  39. /// This daemon watches the `new_log_entry` channel. Each item delivered by
  40. /// the channel is a request to sync log entries to either a peer
  41. /// (`Some(peer_index)`) or all peers (`None`).
  42. ///
  43. /// The daemon tries to collapse requests about the same peer together. A
  44. /// new task is only scheduled when the pending requests number turns from
  45. /// zero to one. Even then there could still be more than one tasks syncing
  46. /// logs to the same peer at the same time.
  47. ///
  48. /// New tasks will still be scheduled when we are not the leader. The task
  49. /// will exist without not do anything in that case.
  50. ///
  51. /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
  52. /// and backoff strategy.
  53. pub(crate) fn run_log_entry_daemon(&mut self) {
  54. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  55. let tx = SharedSender::new(tx);
  56. self.new_log_entry.replace(tx);
  57. // Clone everything that the thread needs.
  58. let this = self.clone();
  59. let join_handle = std::thread::spawn(move || {
  60. // Note: do not change this to `let _ = ...`.
  61. let _guard = this.daemon_env.for_scope();
  62. log::info!("{:?} sync log entries daemon running ...", this.me);
  63. let mut openings = vec![];
  64. openings.resize_with(this.peers.len(), || {
  65. Opening(Arc::new(AtomicUsize::new(0)))
  66. });
  67. let openings = openings; // Not mutable beyond this point.
  68. while let Ok(peer) = rx.recv() {
  69. if !this.keep_running.load(Ordering::SeqCst) {
  70. break;
  71. }
  72. if !this.inner_state.lock().is_leader() {
  73. continue;
  74. }
  75. for (i, rpc_client) in this.peers.iter().enumerate() {
  76. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  77. {
  78. // Only schedule a new task if the last task has cleared
  79. // the queue of RPC requests.
  80. if openings[i].0.fetch_add(1, Ordering::SeqCst) == 0 {
  81. this.thread_pool.spawn(Self::sync_log_entries(
  82. this.inner_state.clone(),
  83. rpc_client.clone(),
  84. i,
  85. this.new_log_entry.clone().unwrap(),
  86. openings[i].0.clone(),
  87. this.apply_command_signal.clone(),
  88. this.term_marker(),
  89. ));
  90. }
  91. }
  92. }
  93. }
  94. log::info!("{:?} sync log entries daemon done.", this.me);
  95. let stop_wait_group = this.stop_wait_group.clone();
  96. // Making sure the rest of `this` is dropped before the wait group.
  97. drop(this);
  98. drop(stop_wait_group);
  99. });
  100. self.daemon_env
  101. .watch_daemon(Daemon::SyncLogEntries, join_handle);
  102. }
  103. /// Syncs log entries to a peer once, requests a new sync if that fails.
  104. ///
  105. /// Sends an `AppendEntries` request if the planned next log entry to sync
  106. /// is after log start (and thus not covered by the log snapshot). Sends an
  107. /// `InstallSnapshot` request otherwise. The responses of those two types of
  108. /// requests are handled in a similar way.
  109. ///
  110. /// The peer might respond with
  111. /// * Success. Updates the internal record of how much log the peer holds.
  112. /// Marks new log entries as committed if we have a quorum of peers that
  113. /// have persisted the log entries. Note that we do not check if there are
  114. /// more items that can be sent to the peer. A new task will be scheduled
  115. /// for that outside of this daemon.
  116. ///
  117. /// * Nothing at all. A new request to sync log entries will be added to the
  118. /// `new_log_entry` queue.
  119. ///
  120. /// * The log has diverged. The peer disagrees with the request. We'll move
  121. /// the "planned next log entry to sync" towards the log start and request
  122. /// to sync again via the `new_log_entry` queue. The backoff will be
  123. /// exponential until it exceeds the log start, at which point the request
  124. /// becomes a `InstallSnapshot`. Note this case is impossible in a response
  125. /// to a `InstallSnapshot` RPC.
  126. ///
  127. /// * The log entry has been archived. The peer has taken a snapshot at that
  128. /// position and thus cannot verify the request. Along with this response,
  129. /// the peer sends back its commit index, which is guaranteed not to be
  130. /// shadowed by that snapshot. The sync will be retried at that commit index
  131. /// via the `new_log_entry` queue. Note the follow up sync could still fail
  132. /// for the same reason, as the peer might have moved its commit index.
  133. /// However syncing will eventually succeed, since the peer cannot move its
  134. /// commit index indefinitely without accepting any log sync requests from
  135. /// the leader.
  136. ///
  137. /// In the last two cases, the "planned next index to sync" can move towards
  138. /// the log start and end, respectively. We will not move back and forth in
  139. /// a mixed sequence of those two failures. The reasoning is that after a
  140. /// failure of the last case, we will never hit the other failure again,
  141. /// since in the last case we always sync log entry at a committed index,
  142. /// and a committed log entry can never diverge.
  143. async fn sync_log_entries(
  144. rf: Arc<Mutex<RaftState<Command>>>,
  145. rpc_client: impl RemoteRaft<Command>,
  146. peer_index: usize,
  147. rerun: SharedSender<Option<Peer>>,
  148. opening: Arc<AtomicUsize>,
  149. apply_command_signal: Arc<Condvar>,
  150. term_marker: TermMarker<Command>,
  151. ) {
  152. if opening.swap(0, Ordering::SeqCst) == 0 {
  153. return;
  154. }
  155. let operation = Self::build_sync_log_entries(&rf, peer_index);
  156. let (term, prev_log_index, match_index, succeeded) = match operation {
  157. SyncLogEntriesOperation::AppendEntries(args) => {
  158. let term = args.term;
  159. let prev_log_index = args.prev_log_index;
  160. let match_index = args.prev_log_index + args.entries.len();
  161. let succeeded = Self::append_entries(&rpc_client, args).await;
  162. (term, prev_log_index, match_index, succeeded)
  163. }
  164. SyncLogEntriesOperation::InstallSnapshot(args) => {
  165. let term = args.term;
  166. let prev_log_index = args.last_included_index;
  167. let match_index = args.last_included_index;
  168. let succeeded = Self::install_snapshot(&rpc_client, args).await;
  169. (term, prev_log_index, match_index, succeeded)
  170. }
  171. SyncLogEntriesOperation::None => return,
  172. };
  173. let peer = Peer(peer_index);
  174. match succeeded {
  175. Ok(SyncLogEntriesResult::Success) => {
  176. let mut rf = rf.lock();
  177. if rf.current_term != term {
  178. return;
  179. }
  180. check_or_record!(
  181. match_index < rf.log.end(),
  182. ErrorKind::LeaderLogShrunk(match_index),
  183. "The leader log shrunk",
  184. &rf
  185. );
  186. rf.next_index[peer_index] = match_index + 1;
  187. rf.current_step[peer_index] = 0;
  188. if match_index > rf.match_index[peer_index] {
  189. rf.match_index[peer_index] = match_index;
  190. if rf.is_leader() && rf.current_term == term {
  191. let mut matched = rf.match_index.to_vec();
  192. let mid = matched.len() / 2 + 1;
  193. matched.sort_unstable();
  194. let new_commit_index = matched[mid];
  195. if new_commit_index > rf.commit_index
  196. && rf.log.at(new_commit_index).term
  197. == rf.current_term
  198. {
  199. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
  200. // Index new_commit_index exists in the log array,
  201. // which implies new_commit_index is in range
  202. // [log.start(), log.end()).
  203. rf.commit_index = new_commit_index;
  204. apply_command_signal.notify_one();
  205. }
  206. }
  207. }
  208. }
  209. Ok(SyncLogEntriesResult::Archived(committed)) => {
  210. let mut rf = rf.lock();
  211. check_or_record!(
  212. prev_log_index < committed.index,
  213. ErrorKind::RefusedSnapshotAfterCommitted(
  214. prev_log_index,
  215. committed.index
  216. ),
  217. format!(
  218. "Peer {} misbehaves: claimed log index {} is archived, \
  219. but commit index is at {:?}) which is before that",
  220. peer_index, prev_log_index, committed
  221. ),
  222. &rf
  223. );
  224. Self::check_committed(&rf, peer, committed.clone());
  225. rf.current_step[peer_index] = 0;
  226. // Next index moves towards the log end. This is the only place
  227. // where that happens. committed.index should be between log
  228. // start and end, guaranteed by check_committed() above.
  229. rf.next_index[peer_index] = committed.index;
  230. // Ignore the error. The log syncing thread must have died.
  231. let _ = rerun.send(Some(Peer(peer_index)));
  232. }
  233. Ok(SyncLogEntriesResult::Diverged(committed)) => {
  234. let mut rf = rf.lock();
  235. check_or_record!(
  236. prev_log_index > committed.index,
  237. ErrorKind::DivergedBeforeCommitted(
  238. prev_log_index,
  239. committed.index
  240. ),
  241. format!(
  242. "Peer {} claimed log index {} does not match, \
  243. but commit index is at {:?}) which is after that.",
  244. peer_index, prev_log_index, committed
  245. ),
  246. &rf
  247. );
  248. Self::check_committed(&rf, peer, committed.clone());
  249. let step = &mut rf.current_step[peer_index];
  250. if *step < 5 {
  251. *step += 1;
  252. }
  253. let diff = 4 << *step;
  254. let next_index = &mut rf.next_index[peer_index];
  255. if diff >= *next_index {
  256. *next_index = 1usize;
  257. } else {
  258. *next_index -= diff;
  259. }
  260. if *next_index < committed.index {
  261. *next_index = committed.index;
  262. }
  263. // Ignore the error. The log syncing thread must have died.
  264. let _ = rerun.send(Some(Peer(peer_index)));
  265. }
  266. // Do nothing, not our term anymore.
  267. Ok(SyncLogEntriesResult::TermElapsed(term)) => {
  268. term_marker.mark(term);
  269. }
  270. Err(_) => {
  271. tokio::time::sleep(Duration::from_millis(
  272. HEARTBEAT_INTERVAL_MILLIS,
  273. ))
  274. .await;
  275. // Ignore the error. The log syncing thread must have died.
  276. let _ = rerun.send(Some(Peer(peer_index)));
  277. }
  278. };
  279. }
  280. fn check_committed(
  281. rf: &RaftState<Command>,
  282. peer: Peer,
  283. committed: IndexTerm,
  284. ) {
  285. if committed.index < rf.log.start() {
  286. return;
  287. }
  288. check_or_record!(
  289. committed.index < rf.log.end(),
  290. ErrorKind::CommittedBeyondEnd(committed.index),
  291. format!(
  292. "Follower {:?} committed a log entry {:?} that is
  293. beyond the end of the leader log at {:?}",
  294. peer,
  295. committed,
  296. rf.log.end(),
  297. ),
  298. rf
  299. );
  300. let local_term = rf.log.at(committed.index).term;
  301. check_or_record!(
  302. committed.term == local_term,
  303. ErrorKind::DivergedAtCommitted(committed.index),
  304. format!(
  305. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  306. peer, committed.index, committed.term, local_term
  307. ),
  308. rf
  309. );
  310. }
  311. fn build_sync_log_entries(
  312. rf: &Mutex<RaftState<Command>>,
  313. peer_index: usize,
  314. ) -> SyncLogEntriesOperation<Command> {
  315. let rf = rf.lock();
  316. if !rf.is_leader() {
  317. return SyncLogEntriesOperation::None;
  318. }
  319. // To send AppendEntries request, next_index must be strictly larger
  320. // than start(). Otherwise we won't be able to know the log term of the
  321. // entry right before next_index.
  322. if rf.next_index[peer_index] > rf.log.start() {
  323. SyncLogEntriesOperation::AppendEntries(Self::build_append_entries(
  324. &rf, peer_index,
  325. ))
  326. } else {
  327. SyncLogEntriesOperation::InstallSnapshot(
  328. Self::build_install_snapshot(&rf),
  329. )
  330. }
  331. }
  332. fn build_append_entries(
  333. rf: &RaftState<Command>,
  334. peer_index: usize,
  335. ) -> AppendEntriesArgs<Command> {
  336. // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
  337. let prev_log_index = rf.next_index[peer_index] - 1;
  338. let prev_log_term = rf.log.at(prev_log_index).term;
  339. AppendEntriesArgs {
  340. term: rf.current_term,
  341. leader_id: rf.leader_id,
  342. prev_log_index,
  343. prev_log_term,
  344. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  345. leader_commit: rf.commit_index,
  346. }
  347. }
  348. const APPEND_ENTRIES_RETRY: usize = 1;
  349. async fn append_entries(
  350. rpc_client: &dyn RemoteRaft<Command>,
  351. args: AppendEntriesArgs<Command>,
  352. ) -> std::io::Result<SyncLogEntriesResult> {
  353. let term = args.term;
  354. let reply = retry_rpc(
  355. Self::APPEND_ENTRIES_RETRY,
  356. RPC_DEADLINE,
  357. move |_round| rpc_client.append_entries(args.clone()),
  358. )
  359. .await?;
  360. Ok(if reply.term == term {
  361. if let Some(committed) = reply.committed {
  362. if reply.success {
  363. SyncLogEntriesResult::Archived(committed)
  364. } else {
  365. SyncLogEntriesResult::Diverged(committed)
  366. }
  367. } else {
  368. SyncLogEntriesResult::Success
  369. }
  370. } else {
  371. SyncLogEntriesResult::TermElapsed(reply.term)
  372. })
  373. }
  374. fn build_install_snapshot(rf: &RaftState<Command>) -> InstallSnapshotArgs {
  375. let (last, snapshot) = rf.log.snapshot();
  376. InstallSnapshotArgs {
  377. term: rf.current_term,
  378. leader_id: rf.leader_id,
  379. last_included_index: last.index,
  380. last_included_term: last.term,
  381. data: snapshot.to_owned(),
  382. offset: 0,
  383. done: true,
  384. }
  385. }
  386. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  387. async fn install_snapshot(
  388. rpc_client: &dyn RemoteRaft<Command>,
  389. args: InstallSnapshotArgs,
  390. ) -> std::io::Result<SyncLogEntriesResult> {
  391. let term = args.term;
  392. let reply = retry_rpc(
  393. Self::INSTALL_SNAPSHOT_RETRY,
  394. RPC_DEADLINE,
  395. move |_round| rpc_client.install_snapshot(args.clone()),
  396. )
  397. .await?;
  398. Ok(if reply.term == term {
  399. if let Some(committed) = reply.committed {
  400. SyncLogEntriesResult::Archived(committed)
  401. } else {
  402. SyncLogEntriesResult::Success
  403. }
  404. } else {
  405. SyncLogEntriesResult::TermElapsed(reply.term)
  406. })
  407. }
  408. }