sync_log_entries.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use parking_lot::{Condvar, Mutex};
  4. use crate::daemon_env::{Daemon, ErrorKind};
  5. use crate::heartbeats::HEARTBEAT_INTERVAL;
  6. use crate::term_marker::TermMarker;
  7. use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
  8. use crate::verify_authority::DaemonBeatTicker;
  9. use crate::{
  10. check_or_record, AppendEntriesArgs, IndexTerm, InstallSnapshotArgs, Peer,
  11. Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
  12. };
  13. #[repr(align(64))]
  14. struct Opening(Arc<AtomicUsize>);
  15. enum SyncLogEntriesOperation<Command> {
  16. AppendEntries(AppendEntriesArgs<Command>),
  17. InstallSnapshot(InstallSnapshotArgs),
  18. None,
  19. }
  20. enum SyncLogEntriesResult {
  21. TermElapsed(Term),
  22. Archived(IndexTerm),
  23. Diverged(IndexTerm),
  24. Success,
  25. }
  26. #[derive(Clone, Copy, Debug)]
  27. struct TaskNumber(usize);
  28. // Command must be
  29. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  30. // 1. clone: they are copied to the persister.
  31. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  32. // 3. serialize: they are converted to bytes to persist.
  33. impl<Command: ReplicableCommand> Raft<Command> {
  34. /// Runs a daemon thread that syncs log entries to peers.
  35. ///
  36. /// This daemon watches the `new_log_entry` channel. Each item delivered by
  37. /// the channel is a request to sync log entries to either a peer
  38. /// (`Some(peer_index)`) or all peers (`None`).
  39. ///
  40. /// The daemon tries to collapse requests about the same peer together. A
  41. /// new task is only scheduled when the pending requests number turns from
  42. /// zero to one. Even then there could still be more than one tasks syncing
  43. /// logs to the same peer at the same time.
  44. ///
  45. /// New tasks will still be scheduled when we are not the leader. The task
  46. /// will exist without not do anything in that case.
  47. ///
  48. /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
  49. /// and backoff strategy.
  50. pub(crate) fn run_log_entry_daemon(&mut self) {
  51. let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
  52. let tx = SharedSender::new(tx);
  53. self.new_log_entry.replace(tx);
  54. // Clone everything that the thread needs.
  55. let this = self.clone();
  56. let sync_log_entry_daemon = move || {
  57. log::info!("{:?} sync log entries daemon running ...", this.me);
  58. let mut openings = vec![];
  59. openings.resize_with(this.peers.len(), || {
  60. Opening(Arc::new(AtomicUsize::new(0)))
  61. });
  62. let openings = openings; // Not mutable beyond this point.
  63. let mut task_number = 0;
  64. while let Ok(peer) = rx.recv() {
  65. if !this.keep_running.load(Ordering::Relaxed) {
  66. break;
  67. }
  68. if !this.inner_state.lock().is_leader() {
  69. continue;
  70. }
  71. for (i, rpc_client) in this.peers.iter().enumerate() {
  72. if i != this.me.0 && peer.map(|p| p.0 == i).unwrap_or(true)
  73. {
  74. // Only schedule a new task if the last task has cleared
  75. // the queue of RPC requests.
  76. if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {
  77. task_number += 1;
  78. this.thread_pool.spawn(Self::sync_log_entries(
  79. this.inner_state.clone(),
  80. rpc_client.clone(),
  81. i,
  82. this.new_log_entry.clone().unwrap(),
  83. openings[i].0.clone(),
  84. this.apply_command_signal.clone(),
  85. this.term_marker(),
  86. this.beat_ticker(i),
  87. TaskNumber(task_number),
  88. ));
  89. }
  90. }
  91. }
  92. }
  93. log::info!("{:?} sync log entries daemon done.", this.me);
  94. };
  95. self.daemon_env
  96. .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
  97. }
  98. /// Syncs log entries to a peer once, requests a new sync if that fails.
  99. ///
  100. /// Sends an `AppendEntries` request if the planned next log entry to sync
  101. /// is after log start (and thus not covered by the log snapshot). Sends an
  102. /// `InstallSnapshot` request otherwise. The responses of those two types of
  103. /// requests are handled in a similar way.
  104. ///
  105. /// The peer might respond with
  106. /// * Success. Updates the internal record of how much log the peer holds.
  107. /// Marks new log entries as committed if we have a quorum of peers that
  108. /// have persisted the log entries. Note that we do not check if there are
  109. /// more items that can be sent to the peer. A new task will be scheduled
  110. /// for that outside of this daemon.
  111. ///
  112. /// * Nothing at all. A new request to sync log entries will be added to the
  113. /// `new_log_entry` queue.
  114. ///
  115. /// * The log has diverged. The peer disagrees with the request. We'll move
  116. /// the "planned next log entry to sync" towards the log start and request
  117. /// to sync again via the `new_log_entry` queue. The backoff will be
  118. /// exponential until it exceeds the log start, at which point the request
  119. /// becomes a `InstallSnapshot`. Note this case is impossible in a response
  120. /// to a `InstallSnapshot` RPC.
  121. ///
  122. /// * The log entry has been archived. The peer has taken a snapshot at that
  123. /// position and thus cannot verify the request. Along with this response,
  124. /// the peer sends back its commit index, which is guaranteed not to be
  125. /// shadowed by that snapshot. The sync will be retried at that commit index
  126. /// via the `new_log_entry` queue. Note the follow up sync could still fail
  127. /// for the same reason, as the peer might have moved its commit index.
  128. /// However syncing will eventually succeed, since the peer cannot move its
  129. /// commit index indefinitely without accepting any log sync requests from
  130. /// the leader.
  131. ///
  132. /// In the last two cases, the "planned next index to sync" can move towards
  133. /// the log start and end, respectively. We will not move back and forth in
  134. /// a mixed sequence of those two failures. The reasoning is that after a
  135. /// failure of the last case, we will never hit the other failure again,
  136. /// since in the last case we always sync log entry at a committed index,
  137. /// and a committed log entry can never diverge.
  138. #[allow(clippy::too_many_arguments)]
  139. async fn sync_log_entries(
  140. rf: Arc<Mutex<RaftState<Command>>>,
  141. rpc_client: impl RemoteRaft<Command>,
  142. peer_index: usize,
  143. rerun: SharedSender<Option<Peer>>,
  144. opening: Arc<AtomicUsize>,
  145. apply_command_signal: Arc<Condvar>,
  146. term_marker: TermMarker<Command>,
  147. beat_ticker: DaemonBeatTicker,
  148. task_number: TaskNumber,
  149. ) {
  150. if opening.swap(0, Ordering::AcqRel) == 0 {
  151. return;
  152. }
  153. let operation =
  154. Self::build_sync_log_entries(&rf, peer_index, task_number);
  155. let (term, prev_log_index, match_index, succeeded) = match operation {
  156. SyncLogEntriesOperation::AppendEntries(args) => {
  157. let term = args.term;
  158. let prev_log_index = args.prev_log_index;
  159. let match_index = args.prev_log_index + args.entries.len();
  160. let succeeded =
  161. Self::append_entries(&rpc_client, args, beat_ticker).await;
  162. (term, prev_log_index, match_index, succeeded)
  163. }
  164. SyncLogEntriesOperation::InstallSnapshot(args) => {
  165. let term = args.term;
  166. let prev_log_index = args.last_included_index;
  167. let match_index = args.last_included_index;
  168. let succeeded =
  169. Self::install_snapshot(&rpc_client, args, beat_ticker)
  170. .await;
  171. (term, prev_log_index, match_index, succeeded)
  172. }
  173. SyncLogEntriesOperation::None => return,
  174. };
  175. let peer = Peer(peer_index);
  176. match succeeded {
  177. Ok(SyncLogEntriesResult::Success) => {
  178. let mut rf = rf.lock();
  179. check_or_record!(
  180. match_index < rf.log.end(),
  181. ErrorKind::LeaderLogShrunk(match_index),
  182. "The leader log shrunk",
  183. &rf
  184. );
  185. if !rf.is_leader() {
  186. return;
  187. }
  188. if rf.current_term != term {
  189. return;
  190. }
  191. rf.next_index[peer_index] = match_index + 1;
  192. rf.current_step[peer_index] = 0;
  193. if match_index > rf.match_index[peer_index] {
  194. rf.match_index[peer_index] = match_index;
  195. let mut matched = rf.match_index.to_vec();
  196. let mid = matched.len() / 2 + 1;
  197. matched.sort_unstable();
  198. let new_commit_index = matched[mid];
  199. if new_commit_index > rf.commit_index
  200. && rf.log.at(new_commit_index).term == rf.current_term
  201. {
  202. log::info!(
  203. "{:?} moving leader commit index to {} in {:?}",
  204. rf.leader_id,
  205. new_commit_index,
  206. task_number
  207. );
  208. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
  209. // Index new_commit_index exists in the log array,
  210. // which implies new_commit_index is in range
  211. // [log.start(), log.end()).
  212. rf.commit_index = new_commit_index;
  213. apply_command_signal.notify_one();
  214. }
  215. }
  216. // After each round of install snapshot, we must schedule
  217. // another round of append entries. The extra round must be run
  218. // even if match_index did not move after the snapshot is
  219. // installed.
  220. // For example,
  221. // 1. Leader committed index 10, received another request at
  222. // index 11.
  223. // 2. Leader sends append entries to all peers.
  224. // 3. Leader commits index 11. At this time, append entries to
  225. // peer X has not returned, while other append entries have.
  226. // 4. Leader snapshots index 11, received another request at
  227. // index 12.
  228. // 5. Leader needs to update peer X, but does not have the
  229. // commit at index 11 any more. Leader then sends install
  230. // snapshot to peer X at index 11.
  231. // 6. The original append entries request to peer X returns
  232. // successfully, moving match_index to 11.
  233. // 7. The install snapshot request returns successfully.
  234. //
  235. // The installed snapshot is at index 11, which is already sent
  236. // by the previous append entries request. Thus the match index
  237. // did not move. Still, we need the extra round of append
  238. // entries to peer X for log entry at index 12.
  239. if prev_log_index == match_index {
  240. // If we did not make any progress this time, try again.
  241. // This can only happen when installing snapshots.
  242. let _ = rerun.send(Some(Peer(peer_index)));
  243. }
  244. }
  245. Ok(SyncLogEntriesResult::Archived(committed)) => {
  246. let mut rf = rf.lock();
  247. check_or_record!(
  248. prev_log_index < committed.index,
  249. ErrorKind::RefusedSnapshotAfterCommitted(
  250. prev_log_index,
  251. committed.index
  252. ),
  253. format!(
  254. "Peer {} misbehaves: claimed log index {} is archived, \
  255. but commit index is at {:?}) which is before that",
  256. peer_index, prev_log_index, committed
  257. ),
  258. &rf
  259. );
  260. Self::check_committed(&rf, peer, committed.clone());
  261. rf.current_step[peer_index] = 0;
  262. // Next index moves towards the log end. This is the only place
  263. // where that happens. committed.index should be between log
  264. // start and end, guaranteed by check_committed() above.
  265. rf.next_index[peer_index] = committed.index + 1;
  266. // Ignore the error. The log syncing thread must have died.
  267. let _ = rerun.send(Some(Peer(peer_index)));
  268. }
  269. Ok(SyncLogEntriesResult::Diverged(committed)) => {
  270. let mut rf = rf.lock();
  271. check_or_record!(
  272. prev_log_index > committed.index,
  273. ErrorKind::DivergedBeforeCommitted(
  274. prev_log_index,
  275. committed.index
  276. ),
  277. format!(
  278. "Peer {} claimed log index {} does not match, \
  279. but commit index is at {:?}) which is after that.",
  280. peer_index, prev_log_index, committed
  281. ),
  282. &rf
  283. );
  284. Self::check_committed(&rf, peer, committed.clone());
  285. let step = &mut rf.current_step[peer_index];
  286. if *step < 5 {
  287. *step += 1;
  288. }
  289. let diff = 4 << *step;
  290. let next_index = &mut rf.next_index[peer_index];
  291. if diff >= *next_index {
  292. *next_index = 1usize;
  293. } else {
  294. *next_index -= diff;
  295. }
  296. if *next_index < committed.index {
  297. *next_index = committed.index;
  298. }
  299. // Ignore the error. The log syncing thread must have died.
  300. let _ = rerun.send(Some(Peer(peer_index)));
  301. }
  302. // Do nothing, not our term anymore.
  303. Ok(SyncLogEntriesResult::TermElapsed(term)) => {
  304. term_marker.mark(term);
  305. }
  306. Err(_) => {
  307. tokio::time::sleep(HEARTBEAT_INTERVAL).await;
  308. // Ignore the error. The log syncing thread must have died.
  309. let _ = rerun.send(Some(Peer(peer_index)));
  310. }
  311. };
  312. }
  313. fn check_committed(
  314. rf: &RaftState<Command>,
  315. peer: Peer,
  316. committed: IndexTerm,
  317. ) {
  318. if committed.index < rf.log.start() {
  319. return;
  320. }
  321. check_or_record!(
  322. committed.index < rf.log.end(),
  323. ErrorKind::CommittedBeyondEnd(committed.index),
  324. format!(
  325. "Follower {:?} committed a log entry {:?} that is
  326. beyond the end of the leader log at {:?}",
  327. peer,
  328. committed,
  329. rf.log.end(),
  330. ),
  331. rf
  332. );
  333. let local_term = rf.log.at(committed.index).term;
  334. check_or_record!(
  335. committed.term == local_term,
  336. ErrorKind::DivergedAtCommitted(committed.index),
  337. format!(
  338. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  339. peer, committed.index, committed.term, local_term
  340. ),
  341. rf
  342. );
  343. }
  344. fn build_sync_log_entries(
  345. rf: &Mutex<RaftState<Command>>,
  346. peer_index: usize,
  347. task_number: TaskNumber,
  348. ) -> SyncLogEntriesOperation<Command> {
  349. let rf = rf.lock();
  350. if !rf.is_leader() {
  351. return SyncLogEntriesOperation::None;
  352. }
  353. // To send AppendEntries request, next_index must be strictly larger
  354. // than start(). Otherwise we won't be able to know the log term of the
  355. // entry right before next_index.
  356. if rf.next_index[peer_index] > rf.log.start() {
  357. if rf.next_index[peer_index] < rf.log.end() {
  358. log::debug!(
  359. "{:?} building append entries {:?} from {} to {:?}",
  360. rf.leader_id,
  361. task_number,
  362. rf.next_index[peer_index] - 1,
  363. Peer(peer_index)
  364. );
  365. SyncLogEntriesOperation::AppendEntries(
  366. Self::build_append_entries(&rf, peer_index),
  367. )
  368. } else {
  369. log::debug!(
  370. "{:?} nothing in append entries {:?} to {:?}",
  371. rf.leader_id,
  372. task_number,
  373. Peer(peer_index)
  374. );
  375. SyncLogEntriesOperation::None
  376. }
  377. } else {
  378. log::debug!(
  379. "{:?} installing snapshot {:?} at {} to {:?}",
  380. rf.leader_id,
  381. task_number,
  382. rf.log.first_index_term().index,
  383. Peer(peer_index)
  384. );
  385. SyncLogEntriesOperation::InstallSnapshot(
  386. Self::build_install_snapshot(&rf),
  387. )
  388. }
  389. }
  390. fn build_append_entries(
  391. rf: &RaftState<Command>,
  392. peer_index: usize,
  393. ) -> AppendEntriesArgs<Command> {
  394. // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
  395. let prev_log_index = rf.next_index[peer_index] - 1;
  396. let prev_log_term = rf.log.at(prev_log_index).term;
  397. AppendEntriesArgs {
  398. term: rf.current_term,
  399. leader_id: rf.leader_id,
  400. prev_log_index,
  401. prev_log_term,
  402. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  403. leader_commit: rf.commit_index,
  404. }
  405. }
  406. const APPEND_ENTRIES_RETRY: usize = 1;
  407. async fn append_entries(
  408. rpc_client: &dyn RemoteRaft<Command>,
  409. args: AppendEntriesArgs<Command>,
  410. beat_ticker: DaemonBeatTicker,
  411. ) -> std::io::Result<SyncLogEntriesResult> {
  412. let term = args.term;
  413. let beat = beat_ticker.next_beat();
  414. let reply = retry_rpc(
  415. Self::APPEND_ENTRIES_RETRY,
  416. RPC_DEADLINE,
  417. move |_round| rpc_client.append_entries(args.clone()),
  418. )
  419. .await?;
  420. Ok(if reply.term == term {
  421. beat_ticker.tick(beat);
  422. if let Some(committed) = reply.committed {
  423. if reply.success {
  424. SyncLogEntriesResult::Archived(committed)
  425. } else {
  426. SyncLogEntriesResult::Diverged(committed)
  427. }
  428. } else {
  429. SyncLogEntriesResult::Success
  430. }
  431. } else {
  432. SyncLogEntriesResult::TermElapsed(reply.term)
  433. })
  434. }
  435. fn build_install_snapshot(rf: &RaftState<Command>) -> InstallSnapshotArgs {
  436. let (last, snapshot) = rf.log.snapshot();
  437. InstallSnapshotArgs {
  438. term: rf.current_term,
  439. leader_id: rf.leader_id,
  440. last_included_index: last.index,
  441. last_included_term: last.term,
  442. data: snapshot.to_owned(),
  443. offset: 0,
  444. done: true,
  445. }
  446. }
  447. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  448. async fn install_snapshot(
  449. rpc_client: &dyn RemoteRaft<Command>,
  450. args: InstallSnapshotArgs,
  451. beat_ticker: DaemonBeatTicker,
  452. ) -> std::io::Result<SyncLogEntriesResult> {
  453. let term = args.term;
  454. let beat = beat_ticker.next_beat();
  455. let reply = retry_rpc(
  456. Self::INSTALL_SNAPSHOT_RETRY,
  457. RPC_DEADLINE,
  458. move |_round| rpc_client.install_snapshot(args.clone()),
  459. )
  460. .await?;
  461. Ok(if reply.term == term {
  462. beat_ticker.tick(beat);
  463. if let Some(committed) = reply.committed {
  464. SyncLogEntriesResult::Archived(committed)
  465. } else {
  466. SyncLogEntriesResult::Success
  467. }
  468. } else {
  469. SyncLogEntriesResult::TermElapsed(reply.term)
  470. })
  471. }
  472. }