sync_log_entries.rs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. use std::sync::atomic::Ordering;
  2. use std::sync::Arc;
  3. use parking_lot::{Condvar, Mutex};
  4. use crate::daemon_env::ErrorKind;
  5. use crate::heartbeats::HEARTBEAT_INTERVAL;
  6. use crate::peer_progress::PeerProgress;
  7. use crate::remote_context::RemoteContext;
  8. use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
  9. use crate::verify_authority::DaemonBeatTicker;
  10. use crate::{
  11. check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
  12. Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
  13. };
  14. #[derive(Eq, PartialEq)]
  15. enum Event {
  16. NewTerm(Term, Index),
  17. NewLogEntry(Index),
  18. Rerun(Peer),
  19. Shutdown,
  20. }
  21. impl Event {
  22. fn should_schedule(&self, peer: Peer) -> bool {
  23. match self {
  24. Event::NewTerm(..) => true,
  25. Event::NewLogEntry(_index) => true,
  26. Event::Rerun(p) => p == &peer,
  27. Event::Shutdown => false,
  28. }
  29. }
  30. }
  31. #[derive(Clone)]
  32. pub(crate) struct SyncLogEntriesComms {
  33. tx: crate::utils::SharedSender<Event>,
  34. }
  35. impl SyncLogEntriesComms {
  36. pub fn update_followers(&self, index: Index) {
  37. // Ignore the error. The log syncing thread must have died.
  38. let _ = self.tx.send(Event::NewLogEntry(index));
  39. }
  40. pub fn reset_progress(&self, term: Term, index: Index) {
  41. let index = std::cmp::max(index, 1);
  42. let _ = self.tx.send(Event::NewTerm(term, index));
  43. }
  44. pub fn kill(&self) {
  45. // The sync log entry daemon might have exited.
  46. let _ = self.tx.send(Event::Shutdown);
  47. }
  48. fn rerun(&self, peer: Peer) {
  49. // Ignore the error. The log syncing thread must have died.
  50. let _ = self.tx.send(Event::Rerun(peer));
  51. }
  52. }
  53. pub(crate) struct SyncLogEntriesDaemon {
  54. rx: std::sync::mpsc::Receiver<Event>,
  55. peer_progress: Vec<PeerProgress>,
  56. }
  57. pub(crate) fn create(
  58. peer_size: usize,
  59. ) -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
  60. let (tx, rx) = std::sync::mpsc::channel();
  61. let tx = SharedSender::new(tx);
  62. let peer_progress = (0..peer_size).map(PeerProgress::create).collect();
  63. (
  64. SyncLogEntriesComms { tx },
  65. SyncLogEntriesDaemon { rx, peer_progress },
  66. )
  67. }
  68. enum SyncLogEntriesOperation<Command> {
  69. AppendEntries(AppendEntriesArgs<Command>),
  70. InstallSnapshot(InstallSnapshotArgs),
  71. None,
  72. }
  73. enum SyncLogEntriesResult {
  74. TermElapsed(Term),
  75. Archived(IndexTerm),
  76. Diverged(IndexTerm),
  77. Success,
  78. }
  79. #[derive(Clone, Copy, Debug)]
  80. struct TaskNumber(usize);
  81. // Command must be
  82. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  83. // 1. clone: they are copied to the persister.
  84. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  85. // 3. serialize: they are converted to bytes to persist.
  86. impl<Command: ReplicableCommand> Raft<Command> {
  87. /// Runs a daemon thread that syncs log entries to peers.
  88. ///
  89. /// This daemon watches the `new_log_entry` channel. Each item delivered by
  90. /// the channel is a request to sync log entries to either a peer
  91. /// (`Some(peer_index)`) or all peers (`None`).
  92. ///
  93. /// The daemon tries to collapse requests about the same peer together. A
  94. /// new task is only scheduled when the pending requests number turns from
  95. /// zero to one. Even then there could still be more than one tasks syncing
  96. /// logs to the same peer at the same time.
  97. ///
  98. /// New tasks will still be scheduled when we are not the leader. The task
  99. /// will exist without not do anything in that case.
  100. ///
  101. /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
  102. /// and backoff strategy.
  103. pub(crate) fn run_log_entry_daemon(
  104. &self,
  105. SyncLogEntriesDaemon { rx, peer_progress }: SyncLogEntriesDaemon,
  106. ) -> impl FnOnce() {
  107. // Clone everything that the thread needs.
  108. let this = self.clone();
  109. move || {
  110. log::info!("{:?} sync log entries daemon running ...", this.me);
  111. let mut task_number = 0;
  112. while let Ok(event) = rx.recv() {
  113. if !this.keep_running.load(Ordering::Relaxed) {
  114. break;
  115. }
  116. if !this.inner_state.lock().is_leader() {
  117. continue;
  118. }
  119. for (i, rpc_client) in this.peers.iter().enumerate() {
  120. if i != this.me.0 && event.should_schedule(Peer(i)) {
  121. let progress = &peer_progress[i];
  122. if let Event::NewTerm(_term, index) = event {
  123. progress.reset_progress(index);
  124. }
  125. // Only schedule a new task if the last task has cleared
  126. // the queue of RPC requests.
  127. if progress.should_schedule() {
  128. task_number += 1;
  129. this.thread_pool.spawn(Self::sync_log_entries(
  130. this.inner_state.clone(),
  131. rpc_client.clone(),
  132. this.sync_log_entries_comms.clone(),
  133. progress.clone(),
  134. this.apply_command_signal.clone(),
  135. this.beat_ticker(i),
  136. TaskNumber(task_number),
  137. ));
  138. }
  139. }
  140. }
  141. }
  142. log::info!("{:?} sync log entries daemon done.", this.me);
  143. }
  144. }
  145. /// Syncs log entries to a peer once, requests a new sync if that fails.
  146. ///
  147. /// Sends an `AppendEntries` request if the planned next log entry to sync
  148. /// is after log start (and thus not covered by the log snapshot). Sends an
  149. /// `InstallSnapshot` request otherwise. The responses of those two types of
  150. /// requests are handled in a similar way.
  151. ///
  152. /// The peer might respond with
  153. /// * Success. Updates the internal record of how much log the peer holds.
  154. /// Marks new log entries as committed if we have a quorum of peers that
  155. /// have persisted the log entries. Note that we do not check if there are
  156. /// more items that can be sent to the peer. A new task will be scheduled
  157. /// for that outside of this daemon.
  158. ///
  159. /// * Nothing at all. A new request to sync log entries will be added to the
  160. /// `new_log_entry` queue.
  161. ///
  162. /// * The log has diverged. The peer disagrees with the request. We'll move
  163. /// the "planned next log entry to sync" towards the log start and request
  164. /// to sync again via the `new_log_entry` queue. The backoff will be
  165. /// exponential until it exceeds the log start, at which point the request
  166. /// becomes a `InstallSnapshot`. Note this case is impossible in a response
  167. /// to a `InstallSnapshot` RPC.
  168. ///
  169. /// * The log entry has been archived. The peer has taken a snapshot at that
  170. /// position and thus cannot verify the request. Along with this response,
  171. /// the peer sends back its commit index, which is guaranteed not to be
  172. /// shadowed by that snapshot. The sync will be retried at that commit index
  173. /// via the `new_log_entry` queue. Note the follow up sync could still fail
  174. /// for the same reason, as the peer might have moved its commit index.
  175. /// However syncing will eventually succeed, since the peer cannot move its
  176. /// commit index indefinitely without accepting any log sync requests from
  177. /// the leader.
  178. ///
  179. /// In the last two cases, the "planned next index to sync" can move towards
  180. /// the log start and end, respectively. We will not move back and forth in
  181. /// a mixed sequence of those two failures. The reasoning is that after a
  182. /// failure of the last case, we will never hit the other failure again,
  183. /// since in the last case we always sync log entry at a committed index,
  184. /// and a committed log entry can never diverge.
  185. #[allow(clippy::too_many_arguments)]
  186. async fn sync_log_entries(
  187. rf: Arc<Mutex<RaftState<Command>>>,
  188. rpc_client: impl RemoteRaft<Command>,
  189. comms: SyncLogEntriesComms,
  190. progress: PeerProgress,
  191. apply_command_signal: Arc<Condvar>,
  192. beat_ticker: DaemonBeatTicker,
  193. task_number: TaskNumber,
  194. ) {
  195. if !progress.take_task() {
  196. return;
  197. }
  198. let peer = progress.peer;
  199. let operation =
  200. Self::build_sync_log_entries(&rf, &progress, task_number);
  201. let (term, prev_log_index, match_index, succeeded) = match operation {
  202. SyncLogEntriesOperation::AppendEntries(args) => {
  203. let term = args.term;
  204. let prev_log_index = args.prev_log_index;
  205. let match_index = args.prev_log_index + args.entries.len();
  206. let succeeded =
  207. Self::append_entries(&rpc_client, args, beat_ticker).await;
  208. (term, prev_log_index, match_index, succeeded)
  209. }
  210. SyncLogEntriesOperation::InstallSnapshot(args) => {
  211. let term = args.term;
  212. let prev_log_index = args.last_included_index;
  213. let match_index = args.last_included_index;
  214. let succeeded =
  215. Self::install_snapshot(&rpc_client, args, beat_ticker)
  216. .await;
  217. (term, prev_log_index, match_index, succeeded)
  218. }
  219. SyncLogEntriesOperation::None => return,
  220. };
  221. match succeeded {
  222. Ok(SyncLogEntriesResult::Success) => {
  223. let mut rf = rf.lock();
  224. if !rf.is_leader() {
  225. return;
  226. }
  227. if rf.current_term != term {
  228. return;
  229. }
  230. check_or_record!(
  231. match_index < rf.log.end(),
  232. ErrorKind::LeaderLogShrunk(match_index),
  233. "The leader log shrunk",
  234. &rf
  235. );
  236. progress.record_success(match_index);
  237. let peer_index = peer.0;
  238. if match_index > rf.match_index[peer_index] {
  239. rf.match_index[peer_index] = match_index;
  240. let mut matched = rf.match_index.to_vec();
  241. let mid = matched.len() / 2 + 1;
  242. matched.sort_unstable();
  243. let new_commit_index = matched[mid];
  244. if new_commit_index > rf.commit_index
  245. && rf.log.at(new_commit_index).term == rf.current_term
  246. {
  247. log::info!(
  248. "{:?} moving leader commit index to {} in {:?}",
  249. rf.leader_id,
  250. new_commit_index,
  251. task_number
  252. );
  253. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
  254. // Index new_commit_index exists in the log array,
  255. // which implies new_commit_index is in range
  256. // [log.start(), log.end()).
  257. rf.commit_index = new_commit_index;
  258. apply_command_signal.notify_one();
  259. }
  260. }
  261. // After each round of install snapshot, we must schedule
  262. // another round of append entries. The extra round must be run
  263. // even if match_index did not move after the snapshot is
  264. // installed.
  265. // For example,
  266. // 1. Leader committed index 10, received another request at
  267. // index 11.
  268. // 2. Leader sends append entries to all peers.
  269. // 3. Leader commits index 11. At this time, append entries to
  270. // peer X has not returned, while other append entries have.
  271. // 4. Leader snapshots index 11, received another request at
  272. // index 12.
  273. // 5. Leader needs to update peer X, but does not have the
  274. // commit at index 11 any more. Leader then sends install
  275. // snapshot to peer X at index 11.
  276. // 6. The original append entries request to peer X returns
  277. // successfully, moving match_index to 11.
  278. // 7. The install snapshot request returns successfully.
  279. //
  280. // The installed snapshot is at index 11, which is already sent
  281. // by the previous append entries request. Thus the match index
  282. // did not move. Still, we need the extra round of append
  283. // entries to peer X for log entry at index 12.
  284. if prev_log_index == match_index {
  285. // If we did not make any progress this time, try again.
  286. // This can only happen when installing snapshots.
  287. comms.rerun(peer);
  288. }
  289. }
  290. Ok(SyncLogEntriesResult::Archived(committed)) => {
  291. let rf = rf.lock();
  292. check_or_record!(
  293. prev_log_index < committed.index,
  294. ErrorKind::RefusedSnapshotAfterCommitted(
  295. prev_log_index,
  296. committed.index
  297. ),
  298. format!(
  299. "{:?} misbehaves: claimed log index {} is archived, \
  300. but commit index is at {:?}) which is before that",
  301. peer, prev_log_index, committed
  302. ),
  303. &rf
  304. );
  305. Self::check_committed(&rf, peer, committed.clone());
  306. // Next index moves towards the log end. This is the only place
  307. // where that happens. committed.index should be between log
  308. // start and end, guaranteed by check_committed() above.
  309. progress.record_success(committed.index + 1);
  310. // Ignore the error. The log syncing thread must have died.
  311. comms.rerun(peer);
  312. }
  313. Ok(SyncLogEntriesResult::Diverged(committed)) => {
  314. let rf = rf.lock();
  315. check_or_record!(
  316. prev_log_index > committed.index,
  317. ErrorKind::DivergedBeforeCommitted(
  318. prev_log_index,
  319. committed.index
  320. ),
  321. format!(
  322. "{:?} claimed log index {} does not match, \
  323. but commit index is at {:?}) which is after that.",
  324. peer, prev_log_index, committed
  325. ),
  326. &rf
  327. );
  328. Self::check_committed(&rf, peer, committed.clone());
  329. progress.record_failure(committed.index);
  330. // Ignore the error. The log syncing thread must have died.
  331. comms.rerun(peer);
  332. }
  333. // Do nothing, not our term anymore.
  334. Ok(SyncLogEntriesResult::TermElapsed(term)) => {
  335. RemoteContext::<Command>::term_marker().mark(term);
  336. }
  337. Err(_) => {
  338. tokio::time::sleep(HEARTBEAT_INTERVAL).await;
  339. // Ignore the error. The log syncing thread must have died.
  340. comms.rerun(peer);
  341. }
  342. };
  343. }
  344. fn check_committed(
  345. rf: &RaftState<Command>,
  346. peer: Peer,
  347. committed: IndexTerm,
  348. ) {
  349. if committed.index < rf.log.start() {
  350. return;
  351. }
  352. check_or_record!(
  353. committed.index < rf.log.end(),
  354. ErrorKind::CommittedBeyondEnd(committed.index),
  355. format!(
  356. "Follower {:?} committed a log entry {:?} that is
  357. beyond the end of the leader log at {:?}",
  358. peer,
  359. committed,
  360. rf.log.end(),
  361. ),
  362. rf
  363. );
  364. let local_term = rf.log.at(committed.index).term;
  365. check_or_record!(
  366. committed.term == local_term,
  367. ErrorKind::DivergedAtCommitted(committed.index),
  368. format!(
  369. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  370. peer, committed.index, committed.term, local_term
  371. ),
  372. rf
  373. );
  374. }
  375. fn build_sync_log_entries(
  376. rf: &Mutex<RaftState<Command>>,
  377. progress: &PeerProgress,
  378. task_number: TaskNumber,
  379. ) -> SyncLogEntriesOperation<Command> {
  380. let rf = rf.lock();
  381. if !rf.is_leader() {
  382. return SyncLogEntriesOperation::None;
  383. }
  384. let peer = progress.peer;
  385. // To send AppendEntries request, next_index must be strictly larger
  386. // than start(). Otherwise we won't be able to know the log term of the
  387. // entry right before next_index.
  388. let next_index = progress.next_index();
  389. if next_index > rf.log.start() {
  390. if next_index < rf.log.end() {
  391. log::debug!(
  392. "{:?} building append entries {:?} from {} to {:?}",
  393. rf.leader_id,
  394. task_number,
  395. next_index - 1,
  396. peer
  397. );
  398. SyncLogEntriesOperation::AppendEntries(
  399. Self::build_append_entries(&rf, next_index),
  400. )
  401. } else {
  402. log::debug!(
  403. "{:?} nothing in append entries {:?} to {:?}",
  404. rf.leader_id,
  405. task_number,
  406. peer
  407. );
  408. SyncLogEntriesOperation::None
  409. }
  410. } else {
  411. log::debug!(
  412. "{:?} installing snapshot {:?} at {} to {:?}",
  413. rf.leader_id,
  414. task_number,
  415. rf.log.first_index_term().index,
  416. peer,
  417. );
  418. SyncLogEntriesOperation::InstallSnapshot(
  419. Self::build_install_snapshot(&rf),
  420. )
  421. }
  422. }
  423. fn build_append_entries(
  424. rf: &RaftState<Command>,
  425. next_index: Index,
  426. ) -> AppendEntriesArgs<Command> {
  427. // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
  428. let prev_log_index = next_index - 1;
  429. let prev_log_term = rf.log.at(prev_log_index).term;
  430. AppendEntriesArgs {
  431. term: rf.current_term,
  432. leader_id: rf.leader_id,
  433. prev_log_index,
  434. prev_log_term,
  435. entries: rf.log.after(next_index).to_vec(),
  436. leader_commit: rf.commit_index,
  437. }
  438. }
  439. const APPEND_ENTRIES_RETRY: usize = 1;
  440. async fn append_entries(
  441. rpc_client: &dyn RemoteRaft<Command>,
  442. args: AppendEntriesArgs<Command>,
  443. beat_ticker: DaemonBeatTicker,
  444. ) -> std::io::Result<SyncLogEntriesResult> {
  445. let term = args.term;
  446. let beat = beat_ticker.next_beat();
  447. let reply = retry_rpc(
  448. Self::APPEND_ENTRIES_RETRY,
  449. RPC_DEADLINE,
  450. move |_round| rpc_client.append_entries(args.clone()),
  451. )
  452. .await?;
  453. Ok(if reply.term == term {
  454. beat_ticker.tick(beat);
  455. if let Some(committed) = reply.committed {
  456. if reply.success {
  457. SyncLogEntriesResult::Archived(committed)
  458. } else {
  459. SyncLogEntriesResult::Diverged(committed)
  460. }
  461. } else {
  462. SyncLogEntriesResult::Success
  463. }
  464. } else {
  465. SyncLogEntriesResult::TermElapsed(reply.term)
  466. })
  467. }
  468. fn build_install_snapshot(rf: &RaftState<Command>) -> InstallSnapshotArgs {
  469. let (last, snapshot) = rf.log.snapshot();
  470. InstallSnapshotArgs {
  471. term: rf.current_term,
  472. leader_id: rf.leader_id,
  473. last_included_index: last.index,
  474. last_included_term: last.term,
  475. data: snapshot.to_owned(),
  476. offset: 0,
  477. done: true,
  478. }
  479. }
  480. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  481. async fn install_snapshot(
  482. rpc_client: &dyn RemoteRaft<Command>,
  483. args: InstallSnapshotArgs,
  484. beat_ticker: DaemonBeatTicker,
  485. ) -> std::io::Result<SyncLogEntriesResult> {
  486. let term = args.term;
  487. let beat = beat_ticker.next_beat();
  488. let reply = retry_rpc(
  489. Self::INSTALL_SNAPSHOT_RETRY,
  490. RPC_DEADLINE,
  491. move |_round| rpc_client.install_snapshot(args.clone()),
  492. )
  493. .await?;
  494. Ok(if reply.term == term {
  495. beat_ticker.tick(beat);
  496. if let Some(committed) = reply.committed {
  497. SyncLogEntriesResult::Archived(committed)
  498. } else {
  499. SyncLogEntriesResult::Success
  500. }
  501. } else {
  502. SyncLogEntriesResult::TermElapsed(reply.term)
  503. })
  504. }
  505. }