use std::sync::atomic::Ordering; use std::time::Duration; use crate::{Index, Raft, Snapshot, HEARTBEAT_INTERVAL_MILLIS}; pub enum ApplyCommandMessage { Snapshot(Snapshot), Command(Index, Command), } pub trait ApplyCommandFnMut: 'static + Send + FnMut(ApplyCommandMessage) { } impl)> ApplyCommandFnMut for T { } impl Raft where Command: 'static + Clone + Send, { pub(crate) fn run_apply_command_daemon( &self, mut apply_command: impl ApplyCommandFnMut, ) { let keep_running = self.keep_running.clone(); let rf = self.inner_state.clone(); let condvar = self.apply_command_signal.clone(); let snapshot_daemon = self.snapshot_daemon.clone(); let stop_wait_group = self.stop_wait_group.clone(); let join_handle = std::thread::spawn(move || { while keep_running.load(Ordering::SeqCst) { let messages = { let mut rf = rf.lock(); if rf.last_applied >= rf.commit_index { condvar.wait_for( &mut rf, Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS), ); } if rf.last_applied < rf.log.start() { let (index_term, data) = rf.log.snapshot(); let messages = vec![ApplyCommandMessage::Snapshot(Snapshot { last_included_index: index_term.index, data: data.to_vec(), })]; rf.last_applied = rf.log.start(); messages } else if rf.last_applied < rf.commit_index { let index = rf.last_applied + 1; let last_one = rf.commit_index + 1; let messages: Vec> = rf .log .between(index, last_one) .iter() .map(|entry| { ApplyCommandMessage::Command( entry.index, entry.command.clone(), ) }) .collect(); rf.last_applied = rf.commit_index; messages } else { continue; } }; // Release the lock while calling external functions. for message in messages { apply_command(message); snapshot_daemon.trigger(); } } drop(stop_wait_group); }); self.daemon_env.watch_daemon(join_handle); } }