|
|
@@ -39,6 +39,22 @@ impl<Command> Raft<Command>
|
|
|
where
|
|
|
Command: 'static + Clone + Send + serde::Serialize + Default,
|
|
|
{
|
|
|
+ /// Runs a daemon thread that syncs log entries to peers.
|
|
|
+ ///
|
|
|
+ /// This daemon watches the `new_log_entry` channel. Each item delivered by
|
|
|
+ /// the channel is a request to sync log entries to either a peer
|
|
|
+ /// (`Some(peer_index)`) or all peers (`None`).
|
|
|
+ ///
|
|
|
+ /// The daemon tries to collapse requests about the same peer together. A
|
|
|
+ /// new task is only scheduled when the pending requests number turns from
|
|
|
+ /// zero to one. Even then there could still be more than one tasks syncing
|
|
|
+ /// logs to the same peer at the same time.
|
|
|
+ ///
|
|
|
+ /// New tasks will still be scheduled when we are not the leader. The task
|
|
|
+ /// will exist without not do anything in that case.
|
|
|
+ ///
|
|
|
+ /// See comments on [`Raft::sync_log_entries`] to learn about the syncing
|
|
|
+ /// and backoff strategy.
|
|
|
pub(crate) fn run_log_entry_daemon(&mut self) {
|
|
|
let (tx, rx) = std::sync::mpsc::channel::<Option<Peer>>();
|
|
|
self.new_log_entry.replace(tx);
|
|
|
@@ -89,6 +105,46 @@ where
|
|
|
self.daemon_env.watch_daemon(join_handle);
|
|
|
}
|
|
|
|
|
|
+ /// Syncs log entries to a peer once, requests a new sync if that fails.
|
|
|
+ ///
|
|
|
+ /// Sends an `AppendEntries` request if the planned next log entry to sync
|
|
|
+ /// is after log start (and thus not covered by the log snapshot). Sends an
|
|
|
+ /// `InstallSnapshot` request otherwise. The responses of those two types of
|
|
|
+ /// requests are handled in a similar way.
|
|
|
+ ///
|
|
|
+ /// The peer might respond with
|
|
|
+ /// * Success. Updates the internal record of how much log the peer holds.
|
|
|
+ /// Marks new log entries as committed if we have a quorum of peers that
|
|
|
+ /// have persisted the log entries. Note that we do not check if there are
|
|
|
+ /// more items that can be sent to the peer. A new task will be scheduled
|
|
|
+ /// for that outside of this daemon.
|
|
|
+ ///
|
|
|
+ /// * Nothing at all. A new request to sync log entries will be added to the
|
|
|
+ /// `new_log_entry` queue.
|
|
|
+ ///
|
|
|
+ /// * The log has diverged. The peer disagrees with the request. We'll move
|
|
|
+ /// the "planned next log entry to sync" towards the log start and request
|
|
|
+ /// to sync again via the `new_log_entry` queue. The backoff will be
|
|
|
+ /// exponential until it exceeds the log start, at which point the request
|
|
|
+ /// becomes a `InstallSnapshot`. Note this case is impossible in a response
|
|
|
+ /// to a `InstallSnapshot` RPC.
|
|
|
+ ///
|
|
|
+ /// * The log entry has been archived. The peer has taken a snapshot at that
|
|
|
+ /// position and thus cannot verify the request. Along with this response,
|
|
|
+ /// the peer sends back its commit index, which is guaranteed not to be
|
|
|
+ /// shadowed by that snapshot. The sync will be retried at that commit index
|
|
|
+ /// via the `new_log_entry` queue. Note the follow up sync could still fail
|
|
|
+ /// for the same reason, as the peer might have moved its commit index.
|
|
|
+ /// However syncing will eventually succeed, since the peer cannot move its
|
|
|
+ /// commit index indefinitely without accepting any log sync requests from
|
|
|
+ /// the leader.
|
|
|
+ ///
|
|
|
+ /// In the last two cases, the "planned next index to sync" can move towards
|
|
|
+ /// the log start and end, respectively. We will not move back and forth in
|
|
|
+ /// a mixed sequence of those two failures. The reasoning is that after a
|
|
|
+ /// failure of the last case, we will never hit the other failure again,
|
|
|
+ /// since in the last case we always sync log entry at a committed index,
|
|
|
+ /// and a committed log entry can never diverge.
|
|
|
async fn sync_log_entries(
|
|
|
rf: Arc<Mutex<RaftState<Command>>>,
|
|
|
rpc_client: Arc<RpcClient>,
|