sync_log_entries.rs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. use std::sync::atomic::{AtomicUsize, Ordering};
  2. use std::sync::Arc;
  3. use parking_lot::{Condvar, Mutex};
  4. use crate::daemon_env::{Daemon, ErrorKind};
  5. use crate::heartbeats::HEARTBEAT_INTERVAL;
  6. use crate::term_marker::TermMarker;
  7. use crate::utils::{retry_rpc, SharedSender, RPC_DEADLINE};
  8. use crate::verify_authority::DaemonBeatTicker;
  9. use crate::{
  10. check_or_record, AppendEntriesArgs, Index, IndexTerm, InstallSnapshotArgs,
  11. Peer, Raft, RaftState, RemoteRaft, ReplicableCommand, Term,
  12. };
  13. #[derive(Clone)]
  14. enum Event {
  15. NewTerm(Term),
  16. NewLogEntry(Index),
  17. Rerun(Peer),
  18. Shutdown,
  19. }
  20. impl Event {
  21. fn should_schedule(&self, peer: Peer) -> bool {
  22. match self {
  23. Event::NewTerm(_term) => true,
  24. Event::NewLogEntry(_index) => true,
  25. Event::Rerun(p) => p == &peer,
  26. Event::Shutdown => false,
  27. }
  28. }
  29. }
  30. #[derive(Clone)]
  31. pub(crate) struct SyncLogEntriesComms {
  32. tx: crate::utils::SharedSender<Event>,
  33. }
  34. impl SyncLogEntriesComms {
  35. pub fn update_followers(&self, index: Index) {
  36. // Ignore the error. The log syncing thread must have died.
  37. let _ = self.tx.send(Event::NewLogEntry(index));
  38. }
  39. pub fn kill(&self) {
  40. self.tx
  41. .send(Event::Shutdown)
  42. .expect("The sync log entries daemon should still be alive");
  43. }
  44. fn rerun(&self, peer_index: usize) {
  45. // Ignore the error. The log syncing thread must have died.
  46. let _ = self.tx.send(Event::Rerun(Peer(peer_index)));
  47. }
  48. }
  49. pub(crate) struct SyncLogEntriesDaemon {
  50. rx: std::sync::mpsc::Receiver<Event>,
  51. }
  52. pub(crate) fn create() -> (SyncLogEntriesComms, SyncLogEntriesDaemon) {
  53. let (tx, rx) = std::sync::mpsc::channel();
  54. let tx = SharedSender::new(tx);
  55. (SyncLogEntriesComms { tx }, SyncLogEntriesDaemon { rx })
  56. }
  57. #[repr(align(64))]
  58. struct Opening(Arc<AtomicUsize>);
  59. enum SyncLogEntriesOperation<Command> {
  60. AppendEntries(AppendEntriesArgs<Command>),
  61. InstallSnapshot(InstallSnapshotArgs),
  62. None,
  63. }
  64. enum SyncLogEntriesResult {
  65. TermElapsed(Term),
  66. Archived(IndexTerm),
  67. Diverged(IndexTerm),
  68. Success,
  69. }
  70. #[derive(Clone, Copy, Debug)]
  71. struct TaskNumber(usize);
  72. // Command must be
  73. // 0. 'static: Raft<Command> must be 'static, it is moved to another thread.
  74. // 1. clone: they are copied to the persister.
  75. // 2. send: Arc<Mutex<Vec<LogEntry<Command>>>> must be send, it is moved to another thread.
  76. // 3. serialize: they are converted to bytes to persist.
  77. impl<Command: ReplicableCommand> Raft<Command> {
  78. /// Runs a daemon thread that syncs log entries to peers.
  79. ///
  80. /// This daemon watches the `new_log_entry` channel. Each item delivered by
  81. /// the channel is a request to sync log entries to either a peer
  82. /// (`Some(peer_index)`) or all peers (`None`).
  83. ///
  84. /// The daemon tries to collapse requests about the same peer together. A
  85. /// new task is only scheduled when the pending requests number turns from
  86. /// zero to one. Even then there could still be more than one tasks syncing
  87. /// logs to the same peer at the same time.
  88. ///
  89. /// New tasks will still be scheduled when we are not the leader. The task
  90. /// will exist without not do anything in that case.
  91. ///
  92. /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
  93. /// and backoff strategy.
  94. pub(crate) fn run_log_entry_daemon(
  95. &self,
  96. SyncLogEntriesDaemon { rx }: SyncLogEntriesDaemon,
  97. ) {
  98. // Clone everything that the thread needs.
  99. let this = self.clone();
  100. let sync_log_entry_daemon = move || {
  101. log::info!("{:?} sync log entries daemon running ...", this.me);
  102. let mut openings = vec![];
  103. openings.resize_with(this.peers.len(), || {
  104. Opening(Arc::new(AtomicUsize::new(0)))
  105. });
  106. let openings = openings; // Not mutable beyond this point.
  107. let mut task_number = 0;
  108. while let Ok(event) = rx.recv() {
  109. if !this.keep_running.load(Ordering::Relaxed) {
  110. break;
  111. }
  112. if !this.inner_state.lock().is_leader() {
  113. continue;
  114. }
  115. for (i, rpc_client) in this.peers.iter().enumerate() {
  116. if i != this.me.0 && event.should_schedule(Peer(i)) {
  117. // Only schedule a new task if the last task has cleared
  118. // the queue of RPC requests.
  119. if openings[i].0.fetch_add(1, Ordering::AcqRel) == 0 {
  120. task_number += 1;
  121. this.thread_pool.spawn(Self::sync_log_entries(
  122. this.inner_state.clone(),
  123. rpc_client.clone(),
  124. i,
  125. this.sync_log_entries_comms.clone(),
  126. openings[i].0.clone(),
  127. this.apply_command_signal.clone(),
  128. this.term_marker(),
  129. this.beat_ticker(i),
  130. TaskNumber(task_number),
  131. ));
  132. }
  133. }
  134. }
  135. }
  136. log::info!("{:?} sync log entries daemon done.", this.me);
  137. };
  138. self.daemon_env
  139. .watch_daemon(Daemon::SyncLogEntries, sync_log_entry_daemon);
  140. }
  141. /// Syncs log entries to a peer once, requests a new sync if that fails.
  142. ///
  143. /// Sends an `AppendEntries` request if the planned next log entry to sync
  144. /// is after log start (and thus not covered by the log snapshot). Sends an
  145. /// `InstallSnapshot` request otherwise. The responses of those two types of
  146. /// requests are handled in a similar way.
  147. ///
  148. /// The peer might respond with
  149. /// * Success. Updates the internal record of how much log the peer holds.
  150. /// Marks new log entries as committed if we have a quorum of peers that
  151. /// have persisted the log entries. Note that we do not check if there are
  152. /// more items that can be sent to the peer. A new task will be scheduled
  153. /// for that outside of this daemon.
  154. ///
  155. /// * Nothing at all. A new request to sync log entries will be added to the
  156. /// `new_log_entry` queue.
  157. ///
  158. /// * The log has diverged. The peer disagrees with the request. We'll move
  159. /// the "planned next log entry to sync" towards the log start and request
  160. /// to sync again via the `new_log_entry` queue. The backoff will be
  161. /// exponential until it exceeds the log start, at which point the request
  162. /// becomes a `InstallSnapshot`. Note this case is impossible in a response
  163. /// to a `InstallSnapshot` RPC.
  164. ///
  165. /// * The log entry has been archived. The peer has taken a snapshot at that
  166. /// position and thus cannot verify the request. Along with this response,
  167. /// the peer sends back its commit index, which is guaranteed not to be
  168. /// shadowed by that snapshot. The sync will be retried at that commit index
  169. /// via the `new_log_entry` queue. Note the follow up sync could still fail
  170. /// for the same reason, as the peer might have moved its commit index.
  171. /// However syncing will eventually succeed, since the peer cannot move its
  172. /// commit index indefinitely without accepting any log sync requests from
  173. /// the leader.
  174. ///
  175. /// In the last two cases, the "planned next index to sync" can move towards
  176. /// the log start and end, respectively. We will not move back and forth in
  177. /// a mixed sequence of those two failures. The reasoning is that after a
  178. /// failure of the last case, we will never hit the other failure again,
  179. /// since in the last case we always sync log entry at a committed index,
  180. /// and a committed log entry can never diverge.
  181. #[allow(clippy::too_many_arguments)]
  182. async fn sync_log_entries(
  183. rf: Arc<Mutex<RaftState<Command>>>,
  184. rpc_client: impl RemoteRaft<Command>,
  185. peer_index: usize,
  186. comms: SyncLogEntriesComms,
  187. opening: Arc<AtomicUsize>,
  188. apply_command_signal: Arc<Condvar>,
  189. term_marker: TermMarker<Command>,
  190. beat_ticker: DaemonBeatTicker,
  191. task_number: TaskNumber,
  192. ) {
  193. if opening.swap(0, Ordering::AcqRel) == 0 {
  194. return;
  195. }
  196. let operation =
  197. Self::build_sync_log_entries(&rf, peer_index, task_number);
  198. let (term, prev_log_index, match_index, succeeded) = match operation {
  199. SyncLogEntriesOperation::AppendEntries(args) => {
  200. let term = args.term;
  201. let prev_log_index = args.prev_log_index;
  202. let match_index = args.prev_log_index + args.entries.len();
  203. let succeeded =
  204. Self::append_entries(&rpc_client, args, beat_ticker).await;
  205. (term, prev_log_index, match_index, succeeded)
  206. }
  207. SyncLogEntriesOperation::InstallSnapshot(args) => {
  208. let term = args.term;
  209. let prev_log_index = args.last_included_index;
  210. let match_index = args.last_included_index;
  211. let succeeded =
  212. Self::install_snapshot(&rpc_client, args, beat_ticker)
  213. .await;
  214. (term, prev_log_index, match_index, succeeded)
  215. }
  216. SyncLogEntriesOperation::None => return,
  217. };
  218. let peer = Peer(peer_index);
  219. match succeeded {
  220. Ok(SyncLogEntriesResult::Success) => {
  221. let mut rf = rf.lock();
  222. if !rf.is_leader() {
  223. return;
  224. }
  225. if rf.current_term != term {
  226. return;
  227. }
  228. check_or_record!(
  229. match_index < rf.log.end(),
  230. ErrorKind::LeaderLogShrunk(match_index),
  231. "The leader log shrunk",
  232. &rf
  233. );
  234. rf.next_index[peer_index] = match_index + 1;
  235. rf.current_step[peer_index] = 0;
  236. if match_index > rf.match_index[peer_index] {
  237. rf.match_index[peer_index] = match_index;
  238. let mut matched = rf.match_index.to_vec();
  239. let mid = matched.len() / 2 + 1;
  240. matched.sort_unstable();
  241. let new_commit_index = matched[mid];
  242. if new_commit_index > rf.commit_index
  243. && rf.log.at(new_commit_index).term == rf.current_term
  244. {
  245. log::info!(
  246. "{:?} moving leader commit index to {} in {:?}",
  247. rf.leader_id,
  248. new_commit_index,
  249. task_number
  250. );
  251. // COMMIT_INDEX_INVARIANT, SNAPSHOT_INDEX_INVARIANT:
  252. // Index new_commit_index exists in the log array,
  253. // which implies new_commit_index is in range
  254. // [log.start(), log.end()).
  255. rf.commit_index = new_commit_index;
  256. apply_command_signal.notify_one();
  257. }
  258. }
  259. // After each round of install snapshot, we must schedule
  260. // another round of append entries. The extra round must be run
  261. // even if match_index did not move after the snapshot is
  262. // installed.
  263. // For example,
  264. // 1. Leader committed index 10, received another request at
  265. // index 11.
  266. // 2. Leader sends append entries to all peers.
  267. // 3. Leader commits index 11. At this time, append entries to
  268. // peer X has not returned, while other append entries have.
  269. // 4. Leader snapshots index 11, received another request at
  270. // index 12.
  271. // 5. Leader needs to update peer X, but does not have the
  272. // commit at index 11 any more. Leader then sends install
  273. // snapshot to peer X at index 11.
  274. // 6. The original append entries request to peer X returns
  275. // successfully, moving match_index to 11.
  276. // 7. The install snapshot request returns successfully.
  277. //
  278. // The installed snapshot is at index 11, which is already sent
  279. // by the previous append entries request. Thus the match index
  280. // did not move. Still, we need the extra round of append
  281. // entries to peer X for log entry at index 12.
  282. if prev_log_index == match_index {
  283. // If we did not make any progress this time, try again.
  284. // This can only happen when installing snapshots.
  285. comms.rerun(peer_index);
  286. }
  287. }
  288. Ok(SyncLogEntriesResult::Archived(committed)) => {
  289. let mut rf = rf.lock();
  290. check_or_record!(
  291. prev_log_index < committed.index,
  292. ErrorKind::RefusedSnapshotAfterCommitted(
  293. prev_log_index,
  294. committed.index
  295. ),
  296. format!(
  297. "Peer {} misbehaves: claimed log index {} is archived, \
  298. but commit index is at {:?}) which is before that",
  299. peer_index, prev_log_index, committed
  300. ),
  301. &rf
  302. );
  303. Self::check_committed(&rf, peer, committed.clone());
  304. rf.current_step[peer_index] = 0;
  305. // Next index moves towards the log end. This is the only place
  306. // where that happens. committed.index should be between log
  307. // start and end, guaranteed by check_committed() above.
  308. rf.next_index[peer_index] = committed.index + 1;
  309. // Ignore the error. The log syncing thread must have died.
  310. comms.rerun(peer_index);
  311. }
  312. Ok(SyncLogEntriesResult::Diverged(committed)) => {
  313. let mut rf = rf.lock();
  314. check_or_record!(
  315. prev_log_index > committed.index,
  316. ErrorKind::DivergedBeforeCommitted(
  317. prev_log_index,
  318. committed.index
  319. ),
  320. format!(
  321. "Peer {} claimed log index {} does not match, \
  322. but commit index is at {:?}) which is after that.",
  323. peer_index, prev_log_index, committed
  324. ),
  325. &rf
  326. );
  327. Self::check_committed(&rf, peer, committed.clone());
  328. let step = &mut rf.current_step[peer_index];
  329. if *step < 5 {
  330. *step += 1;
  331. }
  332. let diff = 4 << *step;
  333. let next_index = &mut rf.next_index[peer_index];
  334. if diff >= *next_index {
  335. *next_index = 1usize;
  336. } else {
  337. *next_index -= diff;
  338. }
  339. if *next_index < committed.index {
  340. *next_index = committed.index;
  341. }
  342. // Ignore the error. The log syncing thread must have died.
  343. comms.rerun(peer_index);
  344. }
  345. // Do nothing, not our term anymore.
  346. Ok(SyncLogEntriesResult::TermElapsed(term)) => {
  347. term_marker.mark(term);
  348. }
  349. Err(_) => {
  350. tokio::time::sleep(HEARTBEAT_INTERVAL).await;
  351. // Ignore the error. The log syncing thread must have died.
  352. comms.rerun(peer_index);
  353. }
  354. };
  355. }
  356. fn check_committed(
  357. rf: &RaftState<Command>,
  358. peer: Peer,
  359. committed: IndexTerm,
  360. ) {
  361. if committed.index < rf.log.start() {
  362. return;
  363. }
  364. check_or_record!(
  365. committed.index < rf.log.end(),
  366. ErrorKind::CommittedBeyondEnd(committed.index),
  367. format!(
  368. "Follower {:?} committed a log entry {:?} that is
  369. beyond the end of the leader log at {:?}",
  370. peer,
  371. committed,
  372. rf.log.end(),
  373. ),
  374. rf
  375. );
  376. let local_term = rf.log.at(committed.index).term;
  377. check_or_record!(
  378. committed.term == local_term,
  379. ErrorKind::DivergedAtCommitted(committed.index),
  380. format!(
  381. "{:?} committed log diverged at {:?}: {:?} v.s. leader {:?}",
  382. peer, committed.index, committed.term, local_term
  383. ),
  384. rf
  385. );
  386. }
  387. fn build_sync_log_entries(
  388. rf: &Mutex<RaftState<Command>>,
  389. peer_index: usize,
  390. task_number: TaskNumber,
  391. ) -> SyncLogEntriesOperation<Command> {
  392. let rf = rf.lock();
  393. if !rf.is_leader() {
  394. return SyncLogEntriesOperation::None;
  395. }
  396. // To send AppendEntries request, next_index must be strictly larger
  397. // than start(). Otherwise we won't be able to know the log term of the
  398. // entry right before next_index.
  399. if rf.next_index[peer_index] > rf.log.start() {
  400. if rf.next_index[peer_index] < rf.log.end() {
  401. log::debug!(
  402. "{:?} building append entries {:?} from {} to {:?}",
  403. rf.leader_id,
  404. task_number,
  405. rf.next_index[peer_index] - 1,
  406. Peer(peer_index)
  407. );
  408. SyncLogEntriesOperation::AppendEntries(
  409. Self::build_append_entries(&rf, peer_index),
  410. )
  411. } else {
  412. log::debug!(
  413. "{:?} nothing in append entries {:?} to {:?}",
  414. rf.leader_id,
  415. task_number,
  416. Peer(peer_index)
  417. );
  418. SyncLogEntriesOperation::None
  419. }
  420. } else {
  421. log::debug!(
  422. "{:?} installing snapshot {:?} at {} to {:?}",
  423. rf.leader_id,
  424. task_number,
  425. rf.log.first_index_term().index,
  426. Peer(peer_index)
  427. );
  428. SyncLogEntriesOperation::InstallSnapshot(
  429. Self::build_install_snapshot(&rf),
  430. )
  431. }
  432. }
  433. fn build_append_entries(
  434. rf: &RaftState<Command>,
  435. peer_index: usize,
  436. ) -> AppendEntriesArgs<Command> {
  437. // It is guaranteed that next_index <= rf.log.end(). Panic otherwise.
  438. let prev_log_index = rf.next_index[peer_index] - 1;
  439. let prev_log_term = rf.log.at(prev_log_index).term;
  440. AppendEntriesArgs {
  441. term: rf.current_term,
  442. leader_id: rf.leader_id,
  443. prev_log_index,
  444. prev_log_term,
  445. entries: rf.log.after(rf.next_index[peer_index]).to_vec(),
  446. leader_commit: rf.commit_index,
  447. }
  448. }
  449. const APPEND_ENTRIES_RETRY: usize = 1;
  450. async fn append_entries(
  451. rpc_client: &dyn RemoteRaft<Command>,
  452. args: AppendEntriesArgs<Command>,
  453. beat_ticker: DaemonBeatTicker,
  454. ) -> std::io::Result<SyncLogEntriesResult> {
  455. let term = args.term;
  456. let beat = beat_ticker.next_beat();
  457. let reply = retry_rpc(
  458. Self::APPEND_ENTRIES_RETRY,
  459. RPC_DEADLINE,
  460. move |_round| rpc_client.append_entries(args.clone()),
  461. )
  462. .await?;
  463. Ok(if reply.term == term {
  464. beat_ticker.tick(beat);
  465. if let Some(committed) = reply.committed {
  466. if reply.success {
  467. SyncLogEntriesResult::Archived(committed)
  468. } else {
  469. SyncLogEntriesResult::Diverged(committed)
  470. }
  471. } else {
  472. SyncLogEntriesResult::Success
  473. }
  474. } else {
  475. SyncLogEntriesResult::TermElapsed(reply.term)
  476. })
  477. }
  478. fn build_install_snapshot(rf: &RaftState<Command>) -> InstallSnapshotArgs {
  479. let (last, snapshot) = rf.log.snapshot();
  480. InstallSnapshotArgs {
  481. term: rf.current_term,
  482. leader_id: rf.leader_id,
  483. last_included_index: last.index,
  484. last_included_term: last.term,
  485. data: snapshot.to_owned(),
  486. offset: 0,
  487. done: true,
  488. }
  489. }
  490. const INSTALL_SNAPSHOT_RETRY: usize = 1;
  491. async fn install_snapshot(
  492. rpc_client: &dyn RemoteRaft<Command>,
  493. args: InstallSnapshotArgs,
  494. beat_ticker: DaemonBeatTicker,
  495. ) -> std::io::Result<SyncLogEntriesResult> {
  496. let term = args.term;
  497. let beat = beat_ticker.next_beat();
  498. let reply = retry_rpc(
  499. Self::INSTALL_SNAPSHOT_RETRY,
  500. RPC_DEADLINE,
  501. move |_round| rpc_client.install_snapshot(args.clone()),
  502. )
  503. .await?;
  504. Ok(if reply.term == term {
  505. beat_ticker.tick(beat);
  506. if let Some(committed) = reply.committed {
  507. SyncLogEntriesResult::Archived(committed)
  508. } else {
  509. SyncLogEntriesResult::Success
  510. }
  511. } else {
  512. SyncLogEntriesResult::TermElapsed(reply.term)
  513. })
  514. }
  515. }